dump-things-server/dump_things_service/audit/gitaudit.py
Christian Monch f20daac102
All checks were successful
Test execution / Test-all (push) Successful in 1m52s
add gitaudit auto_flush_timeout
This commit adds the configuration `auto_flush_timeout` to
the gitaudit-configuration. If `auto_flush_timeout` seconds
have passed without adding a new audit log entry and there
is a non-empty changeset, i.e., the changeset was last
touched more than `auto_flush_timeout` seconds ago, the
changeset will be persisted. This results in a commit.

For example, if `auto_flush_timeout` is 60, but every 50
seconds a log-entry for a new record is added to the
gitaudit-instance, the automated persisting of the
changeset will never be triggered (the changeset might
still be persisted, depending on the content of the
changes).

If, on the other hand, a single change was added and
no other change is added in the next `auto_flush_timeout`
seconds, the single change will be persisted,
resulting in a commit with a single change.
2026-03-23 08:58:16 +01:00

353 lines
10 KiB
Python

"""A git-based audit backend
The backend minimizes commits by caching changes until an already
changed record is changed again. In this case all changes are
committed.
Changes are annotated with a time stamp and a user-id
"""
from __future__ import annotations
import hashlib
import json
import re
import time
from datetime import datetime
from pathlib import Path
from threading import (
Lock,
Thread,
)
import yaml
from datalad_core.git_utils import apply_changeset
from datalad_core.repo import Repo
from datalad_core.runners import (
call_git,
CommandError,
)
from . import AuditBackend
index_file_name = 'gitaudit_index.log'
class FlushingThread(Thread):
def __init__(
self,
backend: GitAuditBackend,
auto_flush_timeout: int,
):
super().__init__()
self.auto_flush_timeout = auto_flush_timeout
self.backend = backend
self.exit_requested = False
self.daemon = True
def request_exit(self):
self.exit_requested = True
def run(self):
while not self.exit_requested:
time.sleep(1)
if time.time() - self.backend.last_flush_time > self.auto_flush_timeout:
self.backend.flush()
class GitAuditBackend(AuditBackend):
def __init__(
self,
path: Path,
auto_flush_timeout: int = 60,
):
self.path = path
self.index_path = None
self.cached_index_entries = []
self.current_change_set = {}
self.lock = Lock()
self.last_flush_time = 0
if auto_flush_timeout < 1:
raise ValueError('auto_flush_timeout must be greater or equal to 1')
self.flushing_thread = FlushingThread(self, auto_flush_timeout)
self.flushing_thread.start()
self._init_repo()
def __del__(self):
self.flush()
if self.flushing_thread:
self.flushing_thread.request_exit()
self.flushing_thread.join()
self.flushing_thread = None
def add_record(
self,
record: dict,
committer_id: str,
author_id: str | None = None,
) -> None:
with self.lock:
author_id = committer_id if author_id is None else author_id
record_id = record['pid']
location = self._get_location_for(record_id)
if self._has_pending_changes(location):
self._persist_pending_changes()
self._add_elements(record_id, location, committer_id, author_id, record)
def flush(self):
with self.lock:
self._locked_flush()
def _locked_flush(self):
if self.current_change_set:
self._persist_pending_changes()
if self.cached_index_entries:
with self.index_path.open('at') as f:
f.write('\n'.join(self.cached_index_entries) + '\n')
self.cached_index_entries = []
self.last_flush_time = time.time()
def get_audit_log(
self,
record_id: str,
) -> dict:
with self.lock:
return self._locked_get_audit_log(record_id)
def _locked_get_audit_log(
self,
record_id: str,
) -> dict:
self._locked_flush()
# Get all commits that updated the log. Those will also have updated
# the records
changes = []
yaml_location, log_location = map(str, self._get_location_for(record_id)[1:])
commit_hashes = call_git(
['log', '--format=%H', '--', log_location],
cwd=self.path,
capture_output=True,
).decode().splitlines()
for commit_hash in commit_hashes:
log_diff_lines = call_git(
['show', '--format=%b', commit_hash, '--', log_location],
cwd=self.path,
capture_output=True,
).decode().splitlines()
# Get the log entry
log_line = tuple(
filter(
lambda l: not l.startswith('+++') and l.startswith('+'),
log_diff_lines,
)
)[0][1:]
log_entry = json.loads(log_line)
# Get the YAML diff
yaml_diff_lines = call_git(
['show', '--format=%b', commit_hash, '--', yaml_location],
cwd=self.path,
capture_output=True,
).decode().splitlines()
yaml_diff = '\n'.join(filter(lambda l: l != '', yaml_diff_lines)) + '\n'
# Get the YAML content
yaml_content = call_git(
['show', f'{commit_hash}:{yaml_location}'],
cwd=self.path,
capture_output=True,
).decode()
changes.append(
(
log_entry['time_stamp'],
log_entry['committer_id'],
log_entry['author_id'],
yaml_diff,
yaml_content,
)
)
changes.sort()
return {c[0]: c[1:] for c in changes}
def get_audit_logs(
self,
record_id_pattern: str,
) -> dict:
with self.lock:
return self._locked_get_audit_logs(record_id_pattern)
def _locked_get_audit_logs(
self,
record_id_pattern: str,
) -> dict:
self._locked_flush()
matcher = re.compile(record_id_pattern)
matching_ids = tuple(
filter(
lambda record_id: matcher.fullmatch(record_id) is not None,
self.index,
)
)
return {
record_id: self._locked_get_audit_log(record_id)
for record_id in sorted(matching_ids)
}
def _add_elements(
self,
record_id: str,
location: tuple[str, Path, Path],
committer_id: str,
author_id: str,
record: dict,
) -> bool:
existing_record = self._read_record_from_repo_path(location[1])
if existing_record != record:
self.current_change_set[location[1]] = yaml.dump(
data=record,
sort_keys=False,
allow_unicode=True,
default_flow_style=False,
)
self._add_log_entry(location[2], committer_id, author_id)
self._add_index_entry(record_id)
return True
return False
def _add_log_entry(
self,
log_location: Path,
committer_id: str,
author_id: str,
) -> None:
time_stamp = datetime.now().isoformat()
entry = {
'time_stamp': time_stamp,
'committer_id': committer_id,
'author_id': author_id,
}
log_content = self._read_from_repo_path(log_location).decode()
log_content += json.dumps(entry, ensure_ascii=False) + '\n'
self.current_change_set[log_location] = log_content
def _add_index_entry(
self,
record_id: str,
):
if record_id not in self.index:
self.cached_index_entries.append(record_id)
self.index.add(record_id)
def _read_from_repo_path(
self,
path: Path,
) -> bytes:
try:
return call_git(
['cat-file', '-p', f'master:{str(path)}'],
cwd=self.path,
capture_output=True,
)
except CommandError as e:
if e.returncode == 128:
return b''
raise
def _read_record_from_repo_path(
self,
path: Path,
):
return yaml.safe_load(self._read_from_repo_path(path))
def _has_pending_changes(
self,
location: tuple[str, Path, Path],
) -> bool:
log_pending = location[1] in self.current_change_set
record_pending = location[2] in self.current_change_set
if log_pending != record_pending:
msg = (
f'change status mismatch: changed: '
f'{location[1]} ({log_pending}), '
f'{location[2]} ({record_pending})'
)
raise SystemError(msg)
return log_pending
def _persist_pending_changes(self) -> None:
apply_changeset(
self.repo,
self.current_change_set,
message='persist changes',
)
self.current_change_set = {}
def _get_location_for(
self,
record_id: str,
) -> tuple[str, Path, Path]:
base = hashlib.sha1(record_id.encode()).hexdigest()
dir_1, dir_2, name = base[0:3], base[3:6], base[6:]
location_dir = Path(dir_1) / Path(dir_2)
return (
base,
location_dir / (base + '.yaml'),
location_dir / (base + '.log'),
)
def _init_repo(self) -> None:
if self.path.exists():
is_empty = len(tuple(Path(self.path).glob('**'))) == 1
else:
self.path.mkdir(parents=True)
is_empty = True
self.index_path = self.path / index_file_name
if is_empty:
call_git(['init', '--bare', str(self.path)], capture_output=True)
self.repo = Repo(self.path)
apply_changeset(
self.repo,
{'README.txt': 'A git-based audit backend\n'},
message='add README.txt',
)
self.index_path.write_text('')
else:
self.repo = Repo(self.path)
if not self.index_path.exists():
self._rebuild_index()
with open(self.index_path, 'rt') as f:
self.index = set(line.strip() for line in f.readlines())
def _add_to_index(
self,
record_id: str,
):
if record_id not in self.index:
self.cached_index_entries.append(record_id)
self.index.add(record_id)
def _rebuild_index(self):
tree_entries = call_git(
['ls-tree', '-r', 'master:'],
cwd=self.path,
capture_output=True,
).decode().splitlines()
with open(self.index_path, 'wt') as f:
for line in tree_entries:
if not line.endswith('.yaml'):
continue
flag, object_type, object_hash, file_name = line.split(maxsplit=3)
record = yaml.safe_load(
call_git(
['show', object_hash],
cwd=self.path,
capture_output=True,
).decode()
)
f.write(record['pid'] + '\n')