add gitaudit autoflush #202
6 changed files with 130 additions and 11 deletions
11
CHANGELOG.md
11
CHANGELOG.md
|
|
@ -1,3 +1,14 @@
|
|||
# 5.6.1 (2026-03-20)
|
||||
|
||||
## Bugfixes
|
||||
|
||||
- Ensure that pending gitaudit log entries are always persisted after
|
||||
a defined *timeout*. In version 5.6.0 pending changes could have been
|
||||
cached until the server is shutdown, and only be written then.
|
||||
This change ensures that `dump-things-gitaudit-report` will report
|
||||
all changes after *timeout* seconds.
|
||||
|
||||
|
||||
# 5.6.0 (2026-03-20)
|
||||
|
||||
## New features
|
||||
|
|
|
|||
|
|
@ -520,13 +520,16 @@ collections:
|
|||
audit-backends:
|
||||
- type: gitaudit
|
||||
path: <path to directory>
|
||||
|
||||
auto_flush_timeout: <seconds>
|
||||
...
|
||||
```
|
||||
Here `<path to directory>` must be a path to a directory.
|
||||
If the directory does not exist, it will be created.
|
||||
If the directory exists, it should contain a bare git repository.
|
||||
|
||||
If `auto_flush_timeout` seconds have passed without adding new audit log entries, the current changeset is persisted, resulting in a commit.
|
||||
The default value for `auto_flush_timeout` is 60 seconds. The value must not be lower than `1`.
|
||||
|
||||
The command `dump-things-gitaudit-report <path to directory> <PID-pattern>` can be used to show the audit-log for all PIDs that match the given `PID`-pattern (pattern are in python `re`-module syntax, i.e. use `'.*'` to report changes for all PIDs).
|
||||
Each log entry contains the timestamp of the change, the ID of the curator that posted the change, a diff of the change, and the resulting record.
|
||||
|
||||
|
|
|
|||
|
|
@ -6,11 +6,18 @@ committed.
|
|||
|
||||
Changes are annotated with a time stamp and a user-id
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from threading import (
|
||||
Lock,
|
||||
Thread,
|
||||
)
|
||||
|
||||
import yaml
|
||||
from datalad_core.git_utils import apply_changeset
|
||||
|
|
@ -26,44 +33,93 @@ from . import AuditBackend
|
|||
index_file_name = 'gitaudit_index.log'
|
||||
|
||||
|
||||
class FlushingThread(Thread):
|
||||
def __init__(
|
||||
self,
|
||||
backend: GitAuditBackend,
|
||||
auto_flush_timeout: int,
|
||||
):
|
||||
super().__init__()
|
||||
self.auto_flush_timeout = auto_flush_timeout
|
||||
self.backend = backend
|
||||
self.exit_requested = False
|
||||
self.daemon = True
|
||||
|
||||
def request_exit(self):
|
||||
self.exit_requested = True
|
||||
|
||||
def run(self):
|
||||
while not self.exit_requested:
|
||||
time.sleep(1)
|
||||
if time.time() - self.backend.last_flush_time > self.auto_flush_timeout:
|
||||
self.backend.flush()
|
||||
|
||||
|
||||
class GitAuditBackend(AuditBackend):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
path: Path,
|
||||
auto_flush_timeout: int = 60,
|
||||
):
|
||||
self.path = path
|
||||
self.index_path = None
|
||||
self.cached_index_entries = []
|
||||
self.current_change_set = {}
|
||||
self.lock = Lock()
|
||||
self.last_flush_time = 0
|
||||
if auto_flush_timeout < 1:
|
||||
raise ValueError('auto_flush_timeout must be greater or equal to 1')
|
||||
self.flushing_thread = FlushingThread(self, auto_flush_timeout)
|
||||
self.flushing_thread.start()
|
||||
self._init_repo()
|
||||
|
||||
def __del__(self):
|
||||
self.flush()
|
||||
if self.flushing_thread:
|
||||
self.flushing_thread.request_exit()
|
||||
self.flushing_thread.join()
|
||||
self.flushing_thread = None
|
||||
|
||||
def add_record(
|
||||
self,
|
||||
record: dict,
|
||||
committer_id: str,
|
||||
author_id: str | None = None,
|
||||
) -> None:
|
||||
author_id = committer_id if author_id is None else author_id
|
||||
record_id = record['pid']
|
||||
location = self._get_location_for(record_id)
|
||||
if self._has_pending_changes(location):
|
||||
self._persist_pending_changes()
|
||||
self._add_elements(record_id, location, committer_id, author_id, record)
|
||||
with self.lock:
|
||||
author_id = committer_id if author_id is None else author_id
|
||||
record_id = record['pid']
|
||||
location = self._get_location_for(record_id)
|
||||
if self._has_pending_changes(location):
|
||||
self._persist_pending_changes()
|
||||
self._add_elements(record_id, location, committer_id, author_id, record)
|
||||
|
||||
def flush(self):
|
||||
with self.lock:
|
||||
self._locked_flush()
|
||||
|
||||
def _locked_flush(self):
|
||||
if self.current_change_set:
|
||||
self._persist_pending_changes()
|
||||
if self.cached_index_entries:
|
||||
with self.index_path.open('at') as f:
|
||||
f.write('\n'.join(self.cached_index_entries) + '\n')
|
||||
self.cached_index_entries = []
|
||||
self.last_flush_time = time.time()
|
||||
|
||||
def get_audit_log(
|
||||
self,
|
||||
record_id: str,
|
||||
) -> dict:
|
||||
self.flush()
|
||||
with self.lock:
|
||||
return self._locked_get_audit_log(record_id)
|
||||
|
||||
def _locked_get_audit_log(
|
||||
self,
|
||||
record_id: str,
|
||||
) -> dict:
|
||||
self._locked_flush()
|
||||
|
||||
# Get all commits that updated the log. Those will also have updated
|
||||
# the records
|
||||
|
|
@ -120,7 +176,14 @@ class GitAuditBackend(AuditBackend):
|
|||
self,
|
||||
record_id_pattern: str,
|
||||
) -> dict:
|
||||
self.flush()
|
||||
with self.lock:
|
||||
return self._locked_get_audit_logs(record_id_pattern)
|
||||
|
||||
def _locked_get_audit_logs(
|
||||
self,
|
||||
record_id_pattern: str,
|
||||
) -> dict:
|
||||
self._locked_flush()
|
||||
matcher = re.compile(record_id_pattern)
|
||||
matching_ids = tuple(
|
||||
filter(
|
||||
|
|
@ -129,7 +192,7 @@ class GitAuditBackend(AuditBackend):
|
|||
)
|
||||
)
|
||||
return {
|
||||
record_id: self.get_audit_log(record_id)
|
||||
record_id: self._locked_get_audit_log(record_id)
|
||||
for record_id in sorted(matching_ids)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -133,6 +133,7 @@ class ConfigAuthConfig(StrictModel):
|
|||
class GitAuditBackendConfig(StrictModel):
|
||||
type: Literal['gitaudit']
|
||||
path: Path
|
||||
auto_flush_timeout: int = 60
|
||||
|
||||
|
||||
class TagConfig(StrictModel):
|
||||
|
|
@ -458,7 +459,7 @@ def process_config_object(
|
|||
instance_config.audit_backends[collection_name] = []
|
||||
for audit_backend in collection_info.audit_backends:
|
||||
instance_config.audit_backends[collection_name].append(
|
||||
GitAuditBackend(audit_backend.path)
|
||||
GitAuditBackend(audit_backend.path, audit_backend.auto_flush_timeout)
|
||||
)
|
||||
|
||||
# Create validator for each collection
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ collections:
|
|||
audit_backends:
|
||||
- type: gitaudit
|
||||
path: {{audit_store_path}}
|
||||
auto_flush_timeout: 2
|
||||
collection_2:
|
||||
default_token: basic_access
|
||||
curated: {curated}/collection_2
|
||||
|
|
|
|||
|
|
@ -1,7 +1,9 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
import time
|
||||
import yaml
|
||||
from itertools import count
|
||||
|
||||
from dump_things_service import (
|
||||
HTTP_200_OK,
|
||||
|
|
@ -147,3 +149,41 @@ def test_audit_backend(fastapi_client_simple):
|
|||
assert values[i][0] == user_names[i]
|
||||
assert values[i][1] == f'author_{i}@www.org'
|
||||
assert yaml.safe_load(values[i][3]) == json_objects[i]
|
||||
|
||||
|
||||
def test_audit_backend_auto_flush(fastapi_client_simple):
|
||||
test_client, _ = fastapi_client_simple
|
||||
|
||||
record_id = 'abc:audit-trailed'
|
||||
names = 'Robert', 'Anton'
|
||||
tokens = 'token_1_xxxxx', 'token_admin'
|
||||
user_names = 'test_user_1_curated', 'test_admin'
|
||||
json_objects = tuple(
|
||||
{
|
||||
'schema_type': 'abc:Person',
|
||||
'pid': record_id,
|
||||
'given_name': name,
|
||||
}
|
||||
for name in names
|
||||
)
|
||||
|
||||
for i in range(2):
|
||||
response = test_client.post(
|
||||
f'/collection_1/curated/record/Person?author_id=author_{i}@www.org',
|
||||
headers={'x-dumpthings-token': tokens[i]},
|
||||
json=json_objects[i],
|
||||
)
|
||||
assert response.status_code == HTTP_200_OK
|
||||
|
||||
config_instance = get_config()
|
||||
audit_backend = config_instance.audit_backends['collection_1'][0]
|
||||
|
||||
assert audit_backend.current_change_set, 'expected unpersisted changes in audit log'
|
||||
|
||||
for i in count():
|
||||
if not audit_backend.current_change_set:
|
||||
break
|
||||
i += 1
|
||||
if i == 10:
|
||||
raise ValueError(f'auto flush did not trigger within 10 seconds')
|
||||
time.sleep(1)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue