dump-things-server/dump_things_service/audit/gitaudit.py
Christian Monch 2d9e0c405d
Some checks failed
Test execution / Test-all (push) Failing after 42s
use apply_change_set
Use the code from the minilad branch of
datalad-core.
2026-03-16 09:13:17 +01:00

196 lines
5.9 KiB
Python

"""A git-based audit backend
The backend minimizes commits by caching changes until an already
changed record is changed again. In this case all changes are
committed.
Changes are annotated with a time stamp and a user-id
"""
import hashlib
from datetime import datetime
from pathlib import Path
import yaml
from datalad_core.git_utils import apply_changeset
from datalad_core.repo import Repo
from datalad_core.runners import (
call_git,
CommandError,
)
class GitAuditBackend:
def __init__(
self,
path: Path,
):
self.path = path
self.cache = {}
self.current_change_set = {}
self.repo = self.init_repo()
def add_record(
self,
record_id: str,
record: dict,
user_id: str,
):
location = self.get_location_for(record_id)
if self.has_pending_changes(location):
self.persist_pending_changes()
self.add_elements(location, user_id, record)
def flush(self):
if self.current_change_set:
self.persist_pending_changes()
def get_audit_log(
self,
record_id: str,
) -> dict:
self.flush()
# Get all commits that updated the log. Those will also have updated
# the records
changes = []
yaml_location, log_location = map(str, self.get_location_for(record_id)[1:])
commit_hashes = call_git(
['log', '--format=%H', '--', log_location],
cwd=self.path,
capture_output=True,
).decode().splitlines()
for commit_hash in commit_hashes:
log_diff_lines = call_git(
['show', '--format=%b', commit_hash, '--', log_location],
cwd=self.path,
capture_output=True,
).decode().splitlines()
# Get the log entry
log_entry = tuple(
filter(
lambda l: not l.startswith('+++') and l.startswith('+'),
log_diff_lines,
)
)[0][1:]
time_stamp, user_id = log_entry.split(' ', 1)
# Get the YAML diff
yaml_diff_lines = call_git(
['show', '--format=%b', commit_hash, '--', yaml_location],
cwd=self.path,
capture_output=True,
).decode().splitlines()
yaml_diff = '\n'.join(filter(lambda l: l != '', yaml_diff_lines)) + '\n'
# Get the YAML content
yaml_content = call_git(
['show', f'{commit_hash}:{yaml_location}'],
cwd=self.path,
capture_output=True,
).decode()
changes.append((time_stamp, user_id, yaml_diff, yaml_content))
changes.sort()
return {c[0]: c[1:] for c in changes}
def add_elements(
self,
location: tuple[str, Path, Path],
user_id: str,
record: dict,
) -> bool:
existing_record = self.read_record_from_repo_path(location[1])
if existing_record != record:
self.current_change_set[location[1]] = yaml.dump(
data=record,
sort_keys=False,
allow_unicode=True,
default_flow_style=False,
)
self.add_log_entry(location[2], user_id)
return True
return False
def add_log_entry(
self,
log_location: Path,
user_id: str,
) -> None:
time_stamp = datetime.now().isoformat()
log_content = self.read_from_repo_path(log_location).decode()
log_content += f'{time_stamp} {user_id}\n'
self.current_change_set[log_location] = log_content
def read_from_repo_path(
self,
path: Path,
) -> bytes:
try:
return call_git(
['cat-file', '-p', f'master:{str(path)}'],
cwd=self.path,
capture_output=True,
)
except CommandError as e:
if e.returncode == 128:
return b''
raise
def read_record_from_repo_path(
self,
path: Path,
):
return yaml.safe_load(self.read_from_repo_path(path))
def has_pending_changes(
self,
location: tuple[str, Path, Path],
) -> bool:
log_pending = location[1] in self.current_change_set
record_pending = location[2] in self.current_change_set
if log_pending != record_pending:
msg = (
f'change status mismatch: changed: '
f'{location[1]} ({log_pending}), '
f'{location[2]} ({record_pending})'
)
raise SystemError(msg)
return log_pending
def persist_pending_changes(self) -> None:
apply_changeset(
self.repo,
self.current_change_set,
message='persist changes',
)
self.current_change_set = {}
def get_location_for(
self,
record_id: str,
) -> tuple[str, Path, Path]:
base = hashlib.sha1(record_id.encode()).hexdigest()
dir_1, dir_2, name = base[0:3], base[3:6], base[6:]
location_dir = Path(dir_1) / Path(dir_2)
return (
base,
location_dir / (base + '.yaml'),
location_dir / (base + '.log'),
)
def init_repo(self) -> Repo:
if self.path.exists():
is_empty = len(tuple(Path(self.path).glob('**'))) == 1
else:
self.path.mkdir(parents=True)
is_empty = True
if is_empty:
call_git(['init', '--bare', str(self.path)], cwd=self.path)
self.repo = Repo(self.path)
apply_changeset(
self.repo,
{'README.txt': 'A git-based audit backend\n'},
message='add README.txt',
)
return self.repo