Some checks failed
Test execution / Test-all (push) Failing after 42s
Use the code from the minilad branch of datalad-core.
196 lines
5.9 KiB
Python
196 lines
5.9 KiB
Python
"""A git-based audit backend
|
|
|
|
The backend minimizes commits by caching changes until an already
|
|
changed record is changed again. In this case all changes are
|
|
committed.
|
|
|
|
Changes are annotated with a time stamp and a user-id
|
|
"""
|
|
import hashlib
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
from datalad_core.git_utils import apply_changeset
|
|
from datalad_core.repo import Repo
|
|
from datalad_core.runners import (
|
|
call_git,
|
|
CommandError,
|
|
)
|
|
|
|
class GitAuditBackend:
|
|
|
|
def __init__(
|
|
self,
|
|
path: Path,
|
|
):
|
|
self.path = path
|
|
self.cache = {}
|
|
self.current_change_set = {}
|
|
self.repo = self.init_repo()
|
|
|
|
def add_record(
|
|
self,
|
|
record_id: str,
|
|
record: dict,
|
|
user_id: str,
|
|
):
|
|
location = self.get_location_for(record_id)
|
|
if self.has_pending_changes(location):
|
|
self.persist_pending_changes()
|
|
self.add_elements(location, user_id, record)
|
|
|
|
def flush(self):
|
|
if self.current_change_set:
|
|
self.persist_pending_changes()
|
|
|
|
def get_audit_log(
|
|
self,
|
|
record_id: str,
|
|
) -> dict:
|
|
self.flush()
|
|
|
|
# Get all commits that updated the log. Those will also have updated
|
|
# the records
|
|
changes = []
|
|
yaml_location, log_location = map(str, self.get_location_for(record_id)[1:])
|
|
commit_hashes = call_git(
|
|
['log', '--format=%H', '--', log_location],
|
|
cwd=self.path,
|
|
capture_output=True,
|
|
).decode().splitlines()
|
|
for commit_hash in commit_hashes:
|
|
log_diff_lines = call_git(
|
|
['show', '--format=%b', commit_hash, '--', log_location],
|
|
cwd=self.path,
|
|
capture_output=True,
|
|
).decode().splitlines()
|
|
# Get the log entry
|
|
log_entry = tuple(
|
|
filter(
|
|
lambda l: not l.startswith('+++') and l.startswith('+'),
|
|
log_diff_lines,
|
|
)
|
|
)[0][1:]
|
|
time_stamp, user_id = log_entry.split(' ', 1)
|
|
|
|
# Get the YAML diff
|
|
yaml_diff_lines = call_git(
|
|
['show', '--format=%b', commit_hash, '--', yaml_location],
|
|
cwd=self.path,
|
|
capture_output=True,
|
|
).decode().splitlines()
|
|
yaml_diff = '\n'.join(filter(lambda l: l != '', yaml_diff_lines)) + '\n'
|
|
|
|
# Get the YAML content
|
|
yaml_content = call_git(
|
|
['show', f'{commit_hash}:{yaml_location}'],
|
|
cwd=self.path,
|
|
capture_output=True,
|
|
).decode()
|
|
changes.append((time_stamp, user_id, yaml_diff, yaml_content))
|
|
|
|
changes.sort()
|
|
return {c[0]: c[1:] for c in changes}
|
|
|
|
def add_elements(
|
|
self,
|
|
location: tuple[str, Path, Path],
|
|
user_id: str,
|
|
record: dict,
|
|
) -> bool:
|
|
existing_record = self.read_record_from_repo_path(location[1])
|
|
if existing_record != record:
|
|
self.current_change_set[location[1]] = yaml.dump(
|
|
data=record,
|
|
sort_keys=False,
|
|
allow_unicode=True,
|
|
default_flow_style=False,
|
|
)
|
|
self.add_log_entry(location[2], user_id)
|
|
return True
|
|
return False
|
|
|
|
def add_log_entry(
|
|
self,
|
|
log_location: Path,
|
|
user_id: str,
|
|
) -> None:
|
|
time_stamp = datetime.now().isoformat()
|
|
log_content = self.read_from_repo_path(log_location).decode()
|
|
log_content += f'{time_stamp} {user_id}\n'
|
|
self.current_change_set[log_location] = log_content
|
|
|
|
def read_from_repo_path(
|
|
self,
|
|
path: Path,
|
|
) -> bytes:
|
|
try:
|
|
return call_git(
|
|
['cat-file', '-p', f'master:{str(path)}'],
|
|
cwd=self.path,
|
|
capture_output=True,
|
|
)
|
|
except CommandError as e:
|
|
if e.returncode == 128:
|
|
return b''
|
|
raise
|
|
|
|
def read_record_from_repo_path(
|
|
self,
|
|
path: Path,
|
|
):
|
|
return yaml.safe_load(self.read_from_repo_path(path))
|
|
|
|
def has_pending_changes(
|
|
self,
|
|
location: tuple[str, Path, Path],
|
|
) -> bool:
|
|
log_pending = location[1] in self.current_change_set
|
|
record_pending = location[2] in self.current_change_set
|
|
if log_pending != record_pending:
|
|
msg = (
|
|
f'change status mismatch: changed: '
|
|
f'{location[1]} ({log_pending}), '
|
|
f'{location[2]} ({record_pending})'
|
|
)
|
|
raise SystemError(msg)
|
|
return log_pending
|
|
|
|
def persist_pending_changes(self) -> None:
|
|
apply_changeset(
|
|
self.repo,
|
|
self.current_change_set,
|
|
message='persist changes',
|
|
)
|
|
self.current_change_set = {}
|
|
|
|
def get_location_for(
|
|
self,
|
|
record_id: str,
|
|
) -> tuple[str, Path, Path]:
|
|
base = hashlib.sha1(record_id.encode()).hexdigest()
|
|
dir_1, dir_2, name = base[0:3], base[3:6], base[6:]
|
|
location_dir = Path(dir_1) / Path(dir_2)
|
|
return (
|
|
base,
|
|
location_dir / (base + '.yaml'),
|
|
location_dir / (base + '.log'),
|
|
)
|
|
|
|
def init_repo(self) -> Repo:
|
|
if self.path.exists():
|
|
is_empty = len(tuple(Path(self.path).glob('**'))) == 1
|
|
else:
|
|
self.path.mkdir(parents=True)
|
|
is_empty = True
|
|
if is_empty:
|
|
call_git(['init', '--bare', str(self.path)], cwd=self.path)
|
|
self.repo = Repo(self.path)
|
|
|
|
apply_changeset(
|
|
self.repo,
|
|
{'README.txt': 'A git-based audit backend\n'},
|
|
message='add README.txt',
|
|
)
|
|
return self.repo
|