copy yet unreleased code into project #200

Merged
cmo merged 1 commit from 5.6.0-alpha1 into master 2026-03-19 12:14:18 +00:00
2 changed files with 301 additions and 1 deletions

View file

@ -13,7 +13,6 @@ from datetime import datetime
from pathlib import Path from pathlib import Path
import yaml import yaml
from datalad_core.git_utils import apply_changeset
from datalad_core.repo import Repo from datalad_core.repo import Repo
from datalad_core.runners import ( from datalad_core.runners import (
call_git, call_git,
@ -21,6 +20,7 @@ from datalad_core.runners import (
) )
from . import AuditBackend from . import AuditBackend
from .gitutils import apply_changeset
class GitAuditBackend(AuditBackend): class GitAuditBackend(AuditBackend):

View file

@ -0,0 +1,300 @@
# This is taken from the minilad-branch of `datalad-core`, i.e.:
#
# https://hub.datalad.org/datalad/datalad-core/src/branch/minilad/datalad_core/git_utils/apply_changeset.py#
#
# which is not yet released. Once the `minilad`-branch is merged, this
# should be removed and `apply_changeset` should be imported from
# `datalad_core.gitutils`.
import os
import tempfile
from collections.abc import Mapping
from pathlib import (
Path,
PurePosixPath,
)
from datalad_core.git_utils import types as gt
from datalad_core.git_utils.interrogators import get_object_name
from datalad_core.git_utils.status import iter_repo_status
from datalad_core.repo import (
Repo,
Worktree,
)
from datalad_core.runners import (
call_git,
call_git_oneline,
)
def apply_changeset(
target: Repo | Worktree,
changes: Mapping[
PurePosixPath | str, None | str | Path | tuple[gt.GitObjectMode, str]
],
*,
message: str,
branch: str | None = None,
force: bool = False,
restage: bool = False,
) -> str | None:
"""Apply a changeset to a (bare) repository
The ``target`` parameter identified the repository or worktree
to apply the changeset to. When an effective change was made,
it is committed with ``message`` as the commit message.
When the changeset is not to be applied on top of ``HEAD``,
a different ref can be given via ``branch``. ``HEAD`` need not
exist for using this function (i.e., can be used for an initial
commit too).
A changeset (``changes``) is a mapping from a path (relative to the
repository root), to change specification. The following specification
values are supported:
- ``None``: remove content
- any ``str``-type value: point to (new) blob created from this string
- any ``Path`` instance: point to (new) blob created from the content
of this file
- ``tuple[GitObjectMode, str]``: point to Git object of a mode given
by the first tuple item. The nature of the second value is determined
by the object mode:
- file|executable: <object name>
- symlink: <target path>
- tree: <object name>
- submodule: <subproject commit>
Returns the ID of a created commit, or ``None`` if not commit was made.
Removed files are not deleted from a worktree, but are left as untracked
content.
"""
if not changes:
# nothing to do, early exit
return None
repo = target.repo if isinstance(target, Worktree) else target
# look for any parent commit. will fail with an unknown branch
# (should have created it before)
try:
parent = get_object_name(repo.path, branch or 'HEAD')
except ValueError:
if branch:
# only tolerate an absent HEAD
raise
parent = None
# 1. If not bare, store the state of the index to be able to
# restage content
# restage is the same format as `index_info`, directly prepared for
# git-update-index
restage_items = _check_for_conflicts(
target, changes, branch=branch, force=force, restage=restage
)
# 2. Create a temporary index to build the commit
with tempfile.TemporaryDirectory(
prefix='index',
dir=target.git_dir,
) as tmpdir:
index_file = Path(tmpdir) / 'index'
env = dict(os.environ, GIT_INDEX_FILE=str(index_file))
# 3. Read any parent state into the TMP index
if parent:
call_git(['read-tree', '-q', parent], env=env, cwd=repo.path)
# 4. Apply the changes
index_info: list[str] = []
for path, spec in changes.items():
_prep_update_item(
cwd=repo.path,
env=env,
index_info=index_info,
path=path,
spec=spec,
)
call_git(
['update-index', '-q', '-z', '--index-info'],
inputs='\0'.join(index_info),
env=env,
text=True,
cwd=repo.path,
)
# 5. Commit the changes
tree_id = call_git_oneline(['write-tree'], env=env, cwd=repo.path)
# avoid empty commit by comparing the tree we ended up with, with the tree
# linked to the parent state
if (
parent
and call_git_oneline(['rev-parse', f'{parent}^{{tree}}'], cwd=repo.path)
== tree_id
):
return None
commit_cmd = ['commit-tree', tree_id, '-m', message]
if parent:
commit_cmd.extend(('-p', parent))
commit_id = call_git_oneline(commit_cmd, cwd=repo.path)
# 6. Update the ref to point to the new commit
call_git(
[
'update-ref',
# using HEAD will run with whatever is the default branch name,
# also works in bare repos
f'refs/heads/{branch}' if branch else 'HEAD',
commit_id,
],
cwd=repo.path,
)
if isinstance(target, Repo) or branch is not None:
return commit_id
# 7. If not bare and not branch, merge commit into index
# read-tree will unavoidably cause staged content to be unstaged
call_git(['read-tree', '-m', branch or 'HEAD'], cwd=target.path)
# 8. If not bare and not branch, restage content
if restage:
call_git(
['update-index', '-q', '-z', '--index-info'],
inputs='\0'.join(restage_items),
text=True,
cwd=repo.path,
)
# 9. If not bare and not branch, update checkout
call_git(['checkout-index', '-f', '-u', '-a'], cwd=target.path)
return commit_id
def _prep_update_item(
cwd: Path,
env: Mapping[str, str],
index_info: list[str],
path: PurePosixPath | str,
spec: None | str | Path | tuple[gt.GitObjectMode, str],
):
if not isinstance(path, (str, PurePosixPath)):
msg = f'Unsupported path type in change specification {path!r}'
raise TypeError(msg)
path_str = str(path)
match spec:
case None:
# use magic mode '0' to remove from index
index_info.append(f'0 {40 * "0"}\t{path_str}')
case str():
oid = call_git_oneline(
['hash-object', '-t', 'blob', '-w', '--stdin'],
inputs=spec,
cwd=cwd,
env=env,
)
index_info.append(f'{gt.GitObjectMode.FILE} {oid} 0\t{path_str}')
case Path():
oid = call_git_oneline(
[
'hash-object',
'-t',
'blob',
'-w',
# use --path to enable filters (think EOL conversion)
f'--path={spec}',
str(spec),
],
cwd=cwd,
env=env,
)
index_info.append(f'{gt.GitObjectMode.FILE} {oid} 0\t{path_str}')
case tuple() if spec[0] is gt.GitObjectMode.SYMLINK:
oid = call_git_oneline(
['hash-object', '-t', 'blob', '-w', '--stdin'],
inputs=spec[1],
cwd=cwd,
env=env,
)
index_info.append(f'{gt.GitObjectMode.SYMLINK} {oid} 0\t{path_str}')
case tuple():
index_info.append(f'{spec[0]} {spec[1]} 0\t{path_str}')
case _:
msg = f'Unsupported change specification {spec!r}'
raise ValueError(msg)
def _check_for_conflicts(
target: Repo | Worktree,
changes: Mapping[
PurePosixPath | str, None | str | Path | tuple[gt.GitObjectMode, str]
],
*,
branch: str | None = None,
force: bool = False,
restage: bool = False,
) -> list[str]:
if not isinstance(target, Worktree):
# no worktree, no chance for conflicts
return []
if force and not restage:
# we can fully ignore the state of the worktree and that of the index
return []
if (
branch
and call_git_oneline(['branch', '--show-current'], cwd=target.path) != branch
):
# the changeset targets a branch/ref that is not the current branch.
# no chance for conflicts with the worktree
return []
status = {
r.path: r
for r in iter_repo_status(
target.path,
untracked_files=gt.UntrackedFilesMode.ALL,
# we do not care about submodules, but we want them
# listed of the subproject commit is modified, to
# be able to detect conflicts.
ignore_submodules=gt.IgnoreSubmodulesMode.DIRTY,
)
}
if not force:
# abort for untracked content that conflicts with changeset
untracked_content: set[PurePosixPath] = {
PurePosixPath(r.path)
for r in status.values()
if isinstance(r, gt.RepoUntrackedRecord)
}
untracked_conflict = untracked_content.intersection(
PurePosixPath(r) for r in changes
)
if untracked_conflict:
msg = (
'Refuse to apply changeset with conflicting untracked worktree content'
)
raise ValueError(msg)
# abort for unstaged modifications of the worktree (would be lost by the
# final sync of the worktree with the index)
if any(
r.modification.unstaged != gt.ModificationStateType.UNMODIFIED
for r in status.values()
if isinstance(r, gt.RepoModificationRecord)
):
msg = (
'Refuse to apply changeset to worktree with '
'unstaged/uncommitted modifications'
)
raise ValueError(msg)
# restage is the same format as `index_info`, directly prepared for
# git-update-index
return [
f'{r.mode_index} {r.name_index} 0\t{r.path}'
for r in status.values()
if isinstance(r, gt.RepoModificationRecord)
and r.modification.staged
in (
gt.ModificationStateType.ADDED,
gt.ModificationStateType.MODIFIED,
)
]