add gitaudit autoflush #202

Merged
cmo merged 1 commit from gitaudit_auto_flush into master 2026-03-23 08:06:25 +00:00
6 changed files with 130 additions and 11 deletions

View file

@ -1,3 +1,14 @@
# 5.6.1 (2026-03-20)
## Bugfixes
- Ensure that pending gitaudit log entries are always persisted after
a defined *timeout*. In version 5.6.0 pending changes could have been
cached until the server is shutdown, and only be written then.
This change ensures that `dump-things-gitaudit-report` will report
all changes after *timeout* seconds.
# 5.6.0 (2026-03-20) # 5.6.0 (2026-03-20)
## New features ## New features

View file

@ -520,13 +520,16 @@ collections:
audit-backends: audit-backends:
- type: gitaudit - type: gitaudit
path: <path to directory> path: <path to directory>
auto_flush_timeout: <seconds>
... ...
``` ```
Here `<path to directory>` must be a path to a directory. Here `<path to directory>` must be a path to a directory.
If the directory does not exist, it will be created. If the directory does not exist, it will be created.
If the directory exists, it should contain a bare git repository. If the directory exists, it should contain a bare git repository.
If `auto_flush_timeout` seconds have passed without adding new audit log entries, the current changeset is persisted, resulting in a commit.
The default value for `auto_flush_timeout` is 60 seconds. The value must not be lower than `1`.
The command `dump-things-gitaudit-report <path to directory> <PID-pattern>` can be used to show the audit-log for all PIDs that match the given `PID`-pattern (pattern are in python `re`-module syntax, i.e. use `'.*'` to report changes for all PIDs). The command `dump-things-gitaudit-report <path to directory> <PID-pattern>` can be used to show the audit-log for all PIDs that match the given `PID`-pattern (pattern are in python `re`-module syntax, i.e. use `'.*'` to report changes for all PIDs).
Each log entry contains the timestamp of the change, the ID of the curator that posted the change, a diff of the change, and the resulting record. Each log entry contains the timestamp of the change, the ID of the curator that posted the change, a diff of the change, and the resulting record.

View file

@ -6,11 +6,18 @@ committed.
Changes are annotated with a time stamp and a user-id Changes are annotated with a time stamp and a user-id
""" """
from __future__ import annotations
import hashlib import hashlib
import json import json
import re import re
import time
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from threading import (
Lock,
Thread,
)
import yaml import yaml
from datalad_core.git_utils import apply_changeset from datalad_core.git_utils import apply_changeset
@ -26,24 +33,61 @@ from . import AuditBackend
index_file_name = 'gitaudit_index.log' index_file_name = 'gitaudit_index.log'
class FlushingThread(Thread):
def __init__(
self,
backend: GitAuditBackend,
auto_flush_timeout: int,
):
super().__init__()
self.auto_flush_timeout = auto_flush_timeout
self.backend = backend
self.exit_requested = False
self.daemon = True
def request_exit(self):
self.exit_requested = True
def run(self):
while not self.exit_requested:
time.sleep(1)
if time.time() - self.backend.last_flush_time > self.auto_flush_timeout:
self.backend.flush()
class GitAuditBackend(AuditBackend): class GitAuditBackend(AuditBackend):
def __init__( def __init__(
self, self,
path: Path, path: Path,
auto_flush_timeout: int = 60,
): ):
self.path = path self.path = path
self.index_path = None self.index_path = None
self.cached_index_entries = [] self.cached_index_entries = []
self.current_change_set = {} self.current_change_set = {}
self.lock = Lock()
self.last_flush_time = 0
if auto_flush_timeout < 1:
raise ValueError('auto_flush_timeout must be greater or equal to 1')
self.flushing_thread = FlushingThread(self, auto_flush_timeout)
self.flushing_thread.start()
self._init_repo() self._init_repo()
def __del__(self):
self.flush()
if self.flushing_thread:
self.flushing_thread.request_exit()
self.flushing_thread.join()
self.flushing_thread = None
def add_record( def add_record(
self, self,
record: dict, record: dict,
committer_id: str, committer_id: str,
author_id: str | None = None, author_id: str | None = None,
) -> None: ) -> None:
with self.lock:
author_id = committer_id if author_id is None else author_id author_id = committer_id if author_id is None else author_id
record_id = record['pid'] record_id = record['pid']
location = self._get_location_for(record_id) location = self._get_location_for(record_id)
@ -52,18 +96,30 @@ class GitAuditBackend(AuditBackend):
self._add_elements(record_id, location, committer_id, author_id, record) self._add_elements(record_id, location, committer_id, author_id, record)
def flush(self): def flush(self):
with self.lock:
self._locked_flush()
def _locked_flush(self):
if self.current_change_set: if self.current_change_set:
self._persist_pending_changes() self._persist_pending_changes()
if self.cached_index_entries: if self.cached_index_entries:
with self.index_path.open('at') as f: with self.index_path.open('at') as f:
f.write('\n'.join(self.cached_index_entries) + '\n') f.write('\n'.join(self.cached_index_entries) + '\n')
self.cached_index_entries = [] self.cached_index_entries = []
self.last_flush_time = time.time()
def get_audit_log( def get_audit_log(
self, self,
record_id: str, record_id: str,
) -> dict: ) -> dict:
self.flush() with self.lock:
return self._locked_get_audit_log(record_id)
def _locked_get_audit_log(
self,
record_id: str,
) -> dict:
self._locked_flush()
# Get all commits that updated the log. Those will also have updated # Get all commits that updated the log. Those will also have updated
# the records # the records
@ -120,7 +176,14 @@ class GitAuditBackend(AuditBackend):
self, self,
record_id_pattern: str, record_id_pattern: str,
) -> dict: ) -> dict:
self.flush() with self.lock:
return self._locked_get_audit_logs(record_id_pattern)
def _locked_get_audit_logs(
self,
record_id_pattern: str,
) -> dict:
self._locked_flush()
matcher = re.compile(record_id_pattern) matcher = re.compile(record_id_pattern)
matching_ids = tuple( matching_ids = tuple(
filter( filter(
@ -129,7 +192,7 @@ class GitAuditBackend(AuditBackend):
) )
) )
return { return {
record_id: self.get_audit_log(record_id) record_id: self._locked_get_audit_log(record_id)
for record_id in sorted(matching_ids) for record_id in sorted(matching_ids)
} }

View file

@ -133,6 +133,7 @@ class ConfigAuthConfig(StrictModel):
class GitAuditBackendConfig(StrictModel): class GitAuditBackendConfig(StrictModel):
type: Literal['gitaudit'] type: Literal['gitaudit']
path: Path path: Path
auto_flush_timeout: int = 60
class TagConfig(StrictModel): class TagConfig(StrictModel):
@ -458,7 +459,7 @@ def process_config_object(
instance_config.audit_backends[collection_name] = [] instance_config.audit_backends[collection_name] = []
for audit_backend in collection_info.audit_backends: for audit_backend in collection_info.audit_backends:
instance_config.audit_backends[collection_name].append( instance_config.audit_backends[collection_name].append(
GitAuditBackend(audit_backend.path) GitAuditBackend(audit_backend.path, audit_backend.auto_flush_timeout)
) )
# Create validator for each collection # Create validator for each collection

View file

@ -45,6 +45,7 @@ collections:
audit_backends: audit_backends:
- type: gitaudit - type: gitaudit
path: {{audit_store_path}} path: {{audit_store_path}}
auto_flush_timeout: 2
collection_2: collection_2:
default_token: basic_access default_token: basic_access
curated: {curated}/collection_2 curated: {curated}/collection_2

View file

@ -1,7 +1,9 @@
from __future__ import annotations from __future__ import annotations
import pytest import pytest
import time
import yaml import yaml
from itertools import count
from dump_things_service import ( from dump_things_service import (
HTTP_200_OK, HTTP_200_OK,
@ -147,3 +149,41 @@ def test_audit_backend(fastapi_client_simple):
assert values[i][0] == user_names[i] assert values[i][0] == user_names[i]
assert values[i][1] == f'author_{i}@www.org' assert values[i][1] == f'author_{i}@www.org'
assert yaml.safe_load(values[i][3]) == json_objects[i] assert yaml.safe_load(values[i][3]) == json_objects[i]
def test_audit_backend_auto_flush(fastapi_client_simple):
test_client, _ = fastapi_client_simple
record_id = 'abc:audit-trailed'
names = 'Robert', 'Anton'
tokens = 'token_1_xxxxx', 'token_admin'
user_names = 'test_user_1_curated', 'test_admin'
json_objects = tuple(
{
'schema_type': 'abc:Person',
'pid': record_id,
'given_name': name,
}
for name in names
)
for i in range(2):
response = test_client.post(
f'/collection_1/curated/record/Person?author_id=author_{i}@www.org',
headers={'x-dumpthings-token': tokens[i]},
json=json_objects[i],
)
assert response.status_code == HTTP_200_OK
config_instance = get_config()
audit_backend = config_instance.audit_backends['collection_1'][0]
assert audit_backend.current_change_set, 'expected unpersisted changes in audit log'
for i in count():
if not audit_backend.current_change_set:
break
i += 1
if i == 10:
raise ValueError(f'auto flush did not trigger within 10 seconds')
time.sleep(1)