add gitaudit autoflush #202

Merged
cmo merged 1 commit from gitaudit_auto_flush into master 2026-03-23 08:06:25 +00:00
6 changed files with 130 additions and 11 deletions

View file

@ -1,3 +1,14 @@
# 5.6.1 (2026-03-20)
## Bugfixes
- Ensure that pending gitaudit log entries are always persisted after
a defined *timeout*. In version 5.6.0 pending changes could have been
cached until the server is shutdown, and only be written then.
This change ensures that `dump-things-gitaudit-report` will report
all changes after *timeout* seconds.
# 5.6.0 (2026-03-20)
## New features

View file

@ -520,13 +520,16 @@ collections:
audit-backends:
- type: gitaudit
path: <path to directory>
auto_flush_timeout: <seconds>
...
```
Here `<path to directory>` must be a path to a directory.
If the directory does not exist, it will be created.
If the directory exists, it should contain a bare git repository.
If `auto_flush_timeout` seconds have passed without adding new audit log entries, the current changeset is persisted, resulting in a commit.
The default value for `auto_flush_timeout` is 60 seconds. The value must not be lower than `1`.
The command `dump-things-gitaudit-report <path to directory> <PID-pattern>` can be used to show the audit-log for all PIDs that match the given `PID`-pattern (pattern are in python `re`-module syntax, i.e. use `'.*'` to report changes for all PIDs).
Each log entry contains the timestamp of the change, the ID of the curator that posted the change, a diff of the change, and the resulting record.

View file

@ -6,11 +6,18 @@ committed.
Changes are annotated with a time stamp and a user-id
"""
from __future__ import annotations
import hashlib
import json
import re
import time
from datetime import datetime
from pathlib import Path
from threading import (
Lock,
Thread,
)
import yaml
from datalad_core.git_utils import apply_changeset
@ -26,24 +33,61 @@ from . import AuditBackend
index_file_name = 'gitaudit_index.log'
class FlushingThread(Thread):
def __init__(
self,
backend: GitAuditBackend,
auto_flush_timeout: int,
):
super().__init__()
self.auto_flush_timeout = auto_flush_timeout
self.backend = backend
self.exit_requested = False
self.daemon = True
def request_exit(self):
self.exit_requested = True
def run(self):
while not self.exit_requested:
time.sleep(1)
if time.time() - self.backend.last_flush_time > self.auto_flush_timeout:
self.backend.flush()
class GitAuditBackend(AuditBackend):
def __init__(
self,
path: Path,
auto_flush_timeout: int = 60,
):
self.path = path
self.index_path = None
self.cached_index_entries = []
self.current_change_set = {}
self.lock = Lock()
self.last_flush_time = 0
if auto_flush_timeout < 1:
raise ValueError('auto_flush_timeout must be greater or equal to 1')
self.flushing_thread = FlushingThread(self, auto_flush_timeout)
self.flushing_thread.start()
self._init_repo()
def __del__(self):
self.flush()
if self.flushing_thread:
self.flushing_thread.request_exit()
self.flushing_thread.join()
self.flushing_thread = None
def add_record(
self,
record: dict,
committer_id: str,
author_id: str | None = None,
) -> None:
with self.lock:
author_id = committer_id if author_id is None else author_id
record_id = record['pid']
location = self._get_location_for(record_id)
@ -52,18 +96,30 @@ class GitAuditBackend(AuditBackend):
self._add_elements(record_id, location, committer_id, author_id, record)
def flush(self):
with self.lock:
self._locked_flush()
def _locked_flush(self):
if self.current_change_set:
self._persist_pending_changes()
if self.cached_index_entries:
with self.index_path.open('at') as f:
f.write('\n'.join(self.cached_index_entries) + '\n')
self.cached_index_entries = []
self.last_flush_time = time.time()
def get_audit_log(
self,
record_id: str,
) -> dict:
self.flush()
with self.lock:
return self._locked_get_audit_log(record_id)
def _locked_get_audit_log(
self,
record_id: str,
) -> dict:
self._locked_flush()
# Get all commits that updated the log. Those will also have updated
# the records
@ -120,7 +176,14 @@ class GitAuditBackend(AuditBackend):
self,
record_id_pattern: str,
) -> dict:
self.flush()
with self.lock:
return self._locked_get_audit_logs(record_id_pattern)
def _locked_get_audit_logs(
self,
record_id_pattern: str,
) -> dict:
self._locked_flush()
matcher = re.compile(record_id_pattern)
matching_ids = tuple(
filter(
@ -129,7 +192,7 @@ class GitAuditBackend(AuditBackend):
)
)
return {
record_id: self.get_audit_log(record_id)
record_id: self._locked_get_audit_log(record_id)
for record_id in sorted(matching_ids)
}

View file

@ -133,6 +133,7 @@ class ConfigAuthConfig(StrictModel):
class GitAuditBackendConfig(StrictModel):
type: Literal['gitaudit']
path: Path
auto_flush_timeout: int = 60
class TagConfig(StrictModel):
@ -458,7 +459,7 @@ def process_config_object(
instance_config.audit_backends[collection_name] = []
for audit_backend in collection_info.audit_backends:
instance_config.audit_backends[collection_name].append(
GitAuditBackend(audit_backend.path)
GitAuditBackend(audit_backend.path, audit_backend.auto_flush_timeout)
)
# Create validator for each collection

View file

@ -45,6 +45,7 @@ collections:
audit_backends:
- type: gitaudit
path: {{audit_store_path}}
auto_flush_timeout: 2
collection_2:
default_token: basic_access
curated: {curated}/collection_2

View file

@ -1,7 +1,9 @@
from __future__ import annotations
import pytest
import time
import yaml
from itertools import count
from dump_things_service import (
HTTP_200_OK,
@ -147,3 +149,41 @@ def test_audit_backend(fastapi_client_simple):
assert values[i][0] == user_names[i]
assert values[i][1] == f'author_{i}@www.org'
assert yaml.safe_load(values[i][3]) == json_objects[i]
def test_audit_backend_auto_flush(fastapi_client_simple):
test_client, _ = fastapi_client_simple
record_id = 'abc:audit-trailed'
names = 'Robert', 'Anton'
tokens = 'token_1_xxxxx', 'token_admin'
user_names = 'test_user_1_curated', 'test_admin'
json_objects = tuple(
{
'schema_type': 'abc:Person',
'pid': record_id,
'given_name': name,
}
for name in names
)
for i in range(2):
response = test_client.post(
f'/collection_1/curated/record/Person?author_id=author_{i}@www.org',
headers={'x-dumpthings-token': tokens[i]},
json=json_objects[i],
)
assert response.status_code == HTTP_200_OK
config_instance = get_config()
audit_backend = config_instance.audit_backends['collection_1'][0]
assert audit_backend.current_change_set, 'expected unpersisted changes in audit log'
for i in count():
if not audit_backend.current_change_set:
break
i += 1
if i == 10:
raise ValueError(f'auto flush did not trigger within 10 seconds')
time.sleep(1)