All checks were successful
Test execution / Test-all (push) Successful in 1m52s
This commit adds the configuration `auto_flush_timeout` to the gitaudit-configuration. If `auto_flush_timeout` seconds have passed without adding a new audit log entry and there is a non-empty changeset, i.e., the changeset was last touched more than `auto_flush_timeout` seconds ago, the changeset will be persisted. This results in a commit. For example, if `auto_flush_timeout` is 60, but every 50 seconds a log-entry for a new record is added to the gitaudit-instance, the automated persisting of the changeset will never be triggered (the changeset might still be persisted, depending on the content of the changes). If, on the other hand, a single change was added and no other change is added in the next `auto_flush_timeout` seconds, the single change will be persisted, resulting in a commit with a single change.
353 lines
10 KiB
Python
353 lines
10 KiB
Python
"""A git-based audit backend
|
|
|
|
The backend minimizes commits by caching changes until an already
|
|
changed record is changed again. In this case all changes are
|
|
committed.
|
|
|
|
Changes are annotated with a time stamp and a user-id
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from threading import (
|
|
Lock,
|
|
Thread,
|
|
)
|
|
|
|
import yaml
|
|
from datalad_core.git_utils import apply_changeset
|
|
from datalad_core.repo import Repo
|
|
from datalad_core.runners import (
|
|
call_git,
|
|
CommandError,
|
|
)
|
|
|
|
from . import AuditBackend
|
|
|
|
|
|
index_file_name = 'gitaudit_index.log'
|
|
|
|
|
|
class FlushingThread(Thread):
|
|
def __init__(
|
|
self,
|
|
backend: GitAuditBackend,
|
|
auto_flush_timeout: int,
|
|
):
|
|
super().__init__()
|
|
self.auto_flush_timeout = auto_flush_timeout
|
|
self.backend = backend
|
|
self.exit_requested = False
|
|
self.daemon = True
|
|
|
|
def request_exit(self):
|
|
self.exit_requested = True
|
|
|
|
def run(self):
|
|
while not self.exit_requested:
|
|
time.sleep(1)
|
|
if time.time() - self.backend.last_flush_time > self.auto_flush_timeout:
|
|
self.backend.flush()
|
|
|
|
|
|
class GitAuditBackend(AuditBackend):
|
|
|
|
def __init__(
|
|
self,
|
|
path: Path,
|
|
auto_flush_timeout: int = 60,
|
|
):
|
|
self.path = path
|
|
self.index_path = None
|
|
self.cached_index_entries = []
|
|
self.current_change_set = {}
|
|
self.lock = Lock()
|
|
self.last_flush_time = 0
|
|
if auto_flush_timeout < 1:
|
|
raise ValueError('auto_flush_timeout must be greater or equal to 1')
|
|
self.flushing_thread = FlushingThread(self, auto_flush_timeout)
|
|
self.flushing_thread.start()
|
|
self._init_repo()
|
|
|
|
def __del__(self):
|
|
self.flush()
|
|
if self.flushing_thread:
|
|
self.flushing_thread.request_exit()
|
|
self.flushing_thread.join()
|
|
self.flushing_thread = None
|
|
|
|
def add_record(
|
|
self,
|
|
record: dict,
|
|
committer_id: str,
|
|
author_id: str | None = None,
|
|
) -> None:
|
|
with self.lock:
|
|
author_id = committer_id if author_id is None else author_id
|
|
record_id = record['pid']
|
|
location = self._get_location_for(record_id)
|
|
if self._has_pending_changes(location):
|
|
self._persist_pending_changes()
|
|
self._add_elements(record_id, location, committer_id, author_id, record)
|
|
|
|
def flush(self):
|
|
with self.lock:
|
|
self._locked_flush()
|
|
|
|
def _locked_flush(self):
|
|
if self.current_change_set:
|
|
self._persist_pending_changes()
|
|
if self.cached_index_entries:
|
|
with self.index_path.open('at') as f:
|
|
f.write('\n'.join(self.cached_index_entries) + '\n')
|
|
self.cached_index_entries = []
|
|
self.last_flush_time = time.time()
|
|
|
|
def get_audit_log(
|
|
self,
|
|
record_id: str,
|
|
) -> dict:
|
|
with self.lock:
|
|
return self._locked_get_audit_log(record_id)
|
|
|
|
def _locked_get_audit_log(
|
|
self,
|
|
record_id: str,
|
|
) -> dict:
|
|
self._locked_flush()
|
|
|
|
# Get all commits that updated the log. Those will also have updated
|
|
# the records
|
|
changes = []
|
|
yaml_location, log_location = map(str, self._get_location_for(record_id)[1:])
|
|
commit_hashes = call_git(
|
|
['log', '--format=%H', '--', log_location],
|
|
cwd=self.path,
|
|
capture_output=True,
|
|
).decode().splitlines()
|
|
for commit_hash in commit_hashes:
|
|
log_diff_lines = call_git(
|
|
['show', '--format=%b', commit_hash, '--', log_location],
|
|
cwd=self.path,
|
|
capture_output=True,
|
|
).decode().splitlines()
|
|
# Get the log entry
|
|
log_line = tuple(
|
|
filter(
|
|
lambda l: not l.startswith('+++') and l.startswith('+'),
|
|
log_diff_lines,
|
|
)
|
|
)[0][1:]
|
|
log_entry = json.loads(log_line)
|
|
|
|
# Get the YAML diff
|
|
yaml_diff_lines = call_git(
|
|
['show', '--format=%b', commit_hash, '--', yaml_location],
|
|
cwd=self.path,
|
|
capture_output=True,
|
|
).decode().splitlines()
|
|
yaml_diff = '\n'.join(filter(lambda l: l != '', yaml_diff_lines)) + '\n'
|
|
|
|
# Get the YAML content
|
|
yaml_content = call_git(
|
|
['show', f'{commit_hash}:{yaml_location}'],
|
|
cwd=self.path,
|
|
capture_output=True,
|
|
).decode()
|
|
changes.append(
|
|
(
|
|
log_entry['time_stamp'],
|
|
log_entry['committer_id'],
|
|
log_entry['author_id'],
|
|
yaml_diff,
|
|
yaml_content,
|
|
)
|
|
)
|
|
|
|
changes.sort()
|
|
return {c[0]: c[1:] for c in changes}
|
|
|
|
def get_audit_logs(
|
|
self,
|
|
record_id_pattern: str,
|
|
) -> dict:
|
|
with self.lock:
|
|
return self._locked_get_audit_logs(record_id_pattern)
|
|
|
|
def _locked_get_audit_logs(
|
|
self,
|
|
record_id_pattern: str,
|
|
) -> dict:
|
|
self._locked_flush()
|
|
matcher = re.compile(record_id_pattern)
|
|
matching_ids = tuple(
|
|
filter(
|
|
lambda record_id: matcher.fullmatch(record_id) is not None,
|
|
self.index,
|
|
)
|
|
)
|
|
return {
|
|
record_id: self._locked_get_audit_log(record_id)
|
|
for record_id in sorted(matching_ids)
|
|
}
|
|
|
|
def _add_elements(
|
|
self,
|
|
record_id: str,
|
|
location: tuple[str, Path, Path],
|
|
committer_id: str,
|
|
author_id: str,
|
|
record: dict,
|
|
) -> bool:
|
|
existing_record = self._read_record_from_repo_path(location[1])
|
|
if existing_record != record:
|
|
self.current_change_set[location[1]] = yaml.dump(
|
|
data=record,
|
|
sort_keys=False,
|
|
allow_unicode=True,
|
|
default_flow_style=False,
|
|
)
|
|
self._add_log_entry(location[2], committer_id, author_id)
|
|
self._add_index_entry(record_id)
|
|
return True
|
|
return False
|
|
|
|
def _add_log_entry(
|
|
self,
|
|
log_location: Path,
|
|
committer_id: str,
|
|
author_id: str,
|
|
) -> None:
|
|
time_stamp = datetime.now().isoformat()
|
|
entry = {
|
|
'time_stamp': time_stamp,
|
|
'committer_id': committer_id,
|
|
'author_id': author_id,
|
|
}
|
|
log_content = self._read_from_repo_path(log_location).decode()
|
|
log_content += json.dumps(entry, ensure_ascii=False) + '\n'
|
|
self.current_change_set[log_location] = log_content
|
|
|
|
def _add_index_entry(
|
|
self,
|
|
record_id: str,
|
|
):
|
|
if record_id not in self.index:
|
|
self.cached_index_entries.append(record_id)
|
|
self.index.add(record_id)
|
|
|
|
def _read_from_repo_path(
|
|
self,
|
|
path: Path,
|
|
) -> bytes:
|
|
try:
|
|
return call_git(
|
|
['cat-file', '-p', f'master:{str(path)}'],
|
|
cwd=self.path,
|
|
capture_output=True,
|
|
)
|
|
except CommandError as e:
|
|
if e.returncode == 128:
|
|
return b''
|
|
raise
|
|
|
|
def _read_record_from_repo_path(
|
|
self,
|
|
path: Path,
|
|
):
|
|
return yaml.safe_load(self._read_from_repo_path(path))
|
|
|
|
def _has_pending_changes(
|
|
self,
|
|
location: tuple[str, Path, Path],
|
|
) -> bool:
|
|
log_pending = location[1] in self.current_change_set
|
|
record_pending = location[2] in self.current_change_set
|
|
if log_pending != record_pending:
|
|
msg = (
|
|
f'change status mismatch: changed: '
|
|
f'{location[1]} ({log_pending}), '
|
|
f'{location[2]} ({record_pending})'
|
|
)
|
|
raise SystemError(msg)
|
|
return log_pending
|
|
|
|
def _persist_pending_changes(self) -> None:
|
|
apply_changeset(
|
|
self.repo,
|
|
self.current_change_set,
|
|
message='persist changes',
|
|
)
|
|
self.current_change_set = {}
|
|
|
|
def _get_location_for(
|
|
self,
|
|
record_id: str,
|
|
) -> tuple[str, Path, Path]:
|
|
base = hashlib.sha1(record_id.encode()).hexdigest()
|
|
dir_1, dir_2, name = base[0:3], base[3:6], base[6:]
|
|
location_dir = Path(dir_1) / Path(dir_2)
|
|
return (
|
|
base,
|
|
location_dir / (base + '.yaml'),
|
|
location_dir / (base + '.log'),
|
|
)
|
|
|
|
def _init_repo(self) -> None:
|
|
if self.path.exists():
|
|
is_empty = len(tuple(Path(self.path).glob('**'))) == 1
|
|
else:
|
|
self.path.mkdir(parents=True)
|
|
is_empty = True
|
|
self.index_path = self.path / index_file_name
|
|
|
|
if is_empty:
|
|
call_git(['init', '--bare', str(self.path)], capture_output=True)
|
|
self.repo = Repo(self.path)
|
|
apply_changeset(
|
|
self.repo,
|
|
{'README.txt': 'A git-based audit backend\n'},
|
|
message='add README.txt',
|
|
)
|
|
self.index_path.write_text('')
|
|
else:
|
|
self.repo = Repo(self.path)
|
|
|
|
if not self.index_path.exists():
|
|
self._rebuild_index()
|
|
|
|
with open(self.index_path, 'rt') as f:
|
|
self.index = set(line.strip() for line in f.readlines())
|
|
|
|
def _add_to_index(
|
|
self,
|
|
record_id: str,
|
|
):
|
|
if record_id not in self.index:
|
|
self.cached_index_entries.append(record_id)
|
|
self.index.add(record_id)
|
|
|
|
def _rebuild_index(self):
|
|
tree_entries = call_git(
|
|
['ls-tree', '-r', 'master:'],
|
|
cwd=self.path,
|
|
capture_output=True,
|
|
).decode().splitlines()
|
|
with open(self.index_path, 'wt') as f:
|
|
for line in tree_entries:
|
|
if not line.endswith('.yaml'):
|
|
continue
|
|
flag, object_type, object_hash, file_name = line.split(maxsplit=3)
|
|
record = yaml.safe_load(
|
|
call_git(
|
|
['show', object_hash],
|
|
cwd=self.path,
|
|
capture_output=True,
|
|
).decode()
|
|
)
|
|
f.write(record['pid'] + '\n')
|