add gitaudit autoflush #202
6 changed files with 130 additions and 11 deletions
11
CHANGELOG.md
11
CHANGELOG.md
|
|
@ -1,3 +1,14 @@
|
||||||
|
# 5.6.1 (2026-03-20)
|
||||||
|
|
||||||
|
## Bugfixes
|
||||||
|
|
||||||
|
- Ensure that pending gitaudit log entries are always persisted after
|
||||||
|
a defined *timeout*. In version 5.6.0 pending changes could have been
|
||||||
|
cached until the server is shutdown, and only be written then.
|
||||||
|
This change ensures that `dump-things-gitaudit-report` will report
|
||||||
|
all changes after *timeout* seconds.
|
||||||
|
|
||||||
|
|
||||||
# 5.6.0 (2026-03-20)
|
# 5.6.0 (2026-03-20)
|
||||||
|
|
||||||
## New features
|
## New features
|
||||||
|
|
|
||||||
|
|
@ -520,13 +520,16 @@ collections:
|
||||||
audit-backends:
|
audit-backends:
|
||||||
- type: gitaudit
|
- type: gitaudit
|
||||||
path: <path to directory>
|
path: <path to directory>
|
||||||
|
auto_flush_timeout: <seconds>
|
||||||
...
|
...
|
||||||
```
|
```
|
||||||
Here `<path to directory>` must be a path to a directory.
|
Here `<path to directory>` must be a path to a directory.
|
||||||
If the directory does not exist, it will be created.
|
If the directory does not exist, it will be created.
|
||||||
If the directory exists, it should contain a bare git repository.
|
If the directory exists, it should contain a bare git repository.
|
||||||
|
|
||||||
|
If `auto_flush_timeout` seconds have passed without adding new audit log entries, the current changeset is persisted, resulting in a commit.
|
||||||
|
The default value for `auto_flush_timeout` is 60 seconds. The value must not be lower than `1`.
|
||||||
|
|
||||||
The command `dump-things-gitaudit-report <path to directory> <PID-pattern>` can be used to show the audit-log for all PIDs that match the given `PID`-pattern (pattern are in python `re`-module syntax, i.e. use `'.*'` to report changes for all PIDs).
|
The command `dump-things-gitaudit-report <path to directory> <PID-pattern>` can be used to show the audit-log for all PIDs that match the given `PID`-pattern (pattern are in python `re`-module syntax, i.e. use `'.*'` to report changes for all PIDs).
|
||||||
Each log entry contains the timestamp of the change, the ID of the curator that posted the change, a diff of the change, and the resulting record.
|
Each log entry contains the timestamp of the change, the ID of the curator that posted the change, a diff of the change, and the resulting record.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,11 +6,18 @@ committed.
|
||||||
|
|
||||||
Changes are annotated with a time stamp and a user-id
|
Changes are annotated with a time stamp and a user-id
|
||||||
"""
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from threading import (
|
||||||
|
Lock,
|
||||||
|
Thread,
|
||||||
|
)
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
from datalad_core.git_utils import apply_changeset
|
from datalad_core.git_utils import apply_changeset
|
||||||
|
|
@ -26,24 +33,61 @@ from . import AuditBackend
|
||||||
index_file_name = 'gitaudit_index.log'
|
index_file_name = 'gitaudit_index.log'
|
||||||
|
|
||||||
|
|
||||||
|
class FlushingThread(Thread):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
backend: GitAuditBackend,
|
||||||
|
auto_flush_timeout: int,
|
||||||
|
):
|
||||||
|
super().__init__()
|
||||||
|
self.auto_flush_timeout = auto_flush_timeout
|
||||||
|
self.backend = backend
|
||||||
|
self.exit_requested = False
|
||||||
|
self.daemon = True
|
||||||
|
|
||||||
|
def request_exit(self):
|
||||||
|
self.exit_requested = True
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while not self.exit_requested:
|
||||||
|
time.sleep(1)
|
||||||
|
if time.time() - self.backend.last_flush_time > self.auto_flush_timeout:
|
||||||
|
self.backend.flush()
|
||||||
|
|
||||||
|
|
||||||
class GitAuditBackend(AuditBackend):
|
class GitAuditBackend(AuditBackend):
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
path: Path,
|
path: Path,
|
||||||
|
auto_flush_timeout: int = 60,
|
||||||
):
|
):
|
||||||
self.path = path
|
self.path = path
|
||||||
self.index_path = None
|
self.index_path = None
|
||||||
self.cached_index_entries = []
|
self.cached_index_entries = []
|
||||||
self.current_change_set = {}
|
self.current_change_set = {}
|
||||||
|
self.lock = Lock()
|
||||||
|
self.last_flush_time = 0
|
||||||
|
if auto_flush_timeout < 1:
|
||||||
|
raise ValueError('auto_flush_timeout must be greater or equal to 1')
|
||||||
|
self.flushing_thread = FlushingThread(self, auto_flush_timeout)
|
||||||
|
self.flushing_thread.start()
|
||||||
self._init_repo()
|
self._init_repo()
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.flush()
|
||||||
|
if self.flushing_thread:
|
||||||
|
self.flushing_thread.request_exit()
|
||||||
|
self.flushing_thread.join()
|
||||||
|
self.flushing_thread = None
|
||||||
|
|
||||||
def add_record(
|
def add_record(
|
||||||
self,
|
self,
|
||||||
record: dict,
|
record: dict,
|
||||||
committer_id: str,
|
committer_id: str,
|
||||||
author_id: str | None = None,
|
author_id: str | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
with self.lock:
|
||||||
author_id = committer_id if author_id is None else author_id
|
author_id = committer_id if author_id is None else author_id
|
||||||
record_id = record['pid']
|
record_id = record['pid']
|
||||||
location = self._get_location_for(record_id)
|
location = self._get_location_for(record_id)
|
||||||
|
|
@ -52,18 +96,30 @@ class GitAuditBackend(AuditBackend):
|
||||||
self._add_elements(record_id, location, committer_id, author_id, record)
|
self._add_elements(record_id, location, committer_id, author_id, record)
|
||||||
|
|
||||||
def flush(self):
|
def flush(self):
|
||||||
|
with self.lock:
|
||||||
|
self._locked_flush()
|
||||||
|
|
||||||
|
def _locked_flush(self):
|
||||||
if self.current_change_set:
|
if self.current_change_set:
|
||||||
self._persist_pending_changes()
|
self._persist_pending_changes()
|
||||||
if self.cached_index_entries:
|
if self.cached_index_entries:
|
||||||
with self.index_path.open('at') as f:
|
with self.index_path.open('at') as f:
|
||||||
f.write('\n'.join(self.cached_index_entries) + '\n')
|
f.write('\n'.join(self.cached_index_entries) + '\n')
|
||||||
self.cached_index_entries = []
|
self.cached_index_entries = []
|
||||||
|
self.last_flush_time = time.time()
|
||||||
|
|
||||||
def get_audit_log(
|
def get_audit_log(
|
||||||
self,
|
self,
|
||||||
record_id: str,
|
record_id: str,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
self.flush()
|
with self.lock:
|
||||||
|
return self._locked_get_audit_log(record_id)
|
||||||
|
|
||||||
|
def _locked_get_audit_log(
|
||||||
|
self,
|
||||||
|
record_id: str,
|
||||||
|
) -> dict:
|
||||||
|
self._locked_flush()
|
||||||
|
|
||||||
# Get all commits that updated the log. Those will also have updated
|
# Get all commits that updated the log. Those will also have updated
|
||||||
# the records
|
# the records
|
||||||
|
|
@ -120,7 +176,14 @@ class GitAuditBackend(AuditBackend):
|
||||||
self,
|
self,
|
||||||
record_id_pattern: str,
|
record_id_pattern: str,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
self.flush()
|
with self.lock:
|
||||||
|
return self._locked_get_audit_logs(record_id_pattern)
|
||||||
|
|
||||||
|
def _locked_get_audit_logs(
|
||||||
|
self,
|
||||||
|
record_id_pattern: str,
|
||||||
|
) -> dict:
|
||||||
|
self._locked_flush()
|
||||||
matcher = re.compile(record_id_pattern)
|
matcher = re.compile(record_id_pattern)
|
||||||
matching_ids = tuple(
|
matching_ids = tuple(
|
||||||
filter(
|
filter(
|
||||||
|
|
@ -129,7 +192,7 @@ class GitAuditBackend(AuditBackend):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return {
|
return {
|
||||||
record_id: self.get_audit_log(record_id)
|
record_id: self._locked_get_audit_log(record_id)
|
||||||
for record_id in sorted(matching_ids)
|
for record_id in sorted(matching_ids)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -133,6 +133,7 @@ class ConfigAuthConfig(StrictModel):
|
||||||
class GitAuditBackendConfig(StrictModel):
|
class GitAuditBackendConfig(StrictModel):
|
||||||
type: Literal['gitaudit']
|
type: Literal['gitaudit']
|
||||||
path: Path
|
path: Path
|
||||||
|
auto_flush_timeout: int = 60
|
||||||
|
|
||||||
|
|
||||||
class TagConfig(StrictModel):
|
class TagConfig(StrictModel):
|
||||||
|
|
@ -458,7 +459,7 @@ def process_config_object(
|
||||||
instance_config.audit_backends[collection_name] = []
|
instance_config.audit_backends[collection_name] = []
|
||||||
for audit_backend in collection_info.audit_backends:
|
for audit_backend in collection_info.audit_backends:
|
||||||
instance_config.audit_backends[collection_name].append(
|
instance_config.audit_backends[collection_name].append(
|
||||||
GitAuditBackend(audit_backend.path)
|
GitAuditBackend(audit_backend.path, audit_backend.auto_flush_timeout)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create validator for each collection
|
# Create validator for each collection
|
||||||
|
|
|
||||||
|
|
@ -45,6 +45,7 @@ collections:
|
||||||
audit_backends:
|
audit_backends:
|
||||||
- type: gitaudit
|
- type: gitaudit
|
||||||
path: {{audit_store_path}}
|
path: {{audit_store_path}}
|
||||||
|
auto_flush_timeout: 2
|
||||||
collection_2:
|
collection_2:
|
||||||
default_token: basic_access
|
default_token: basic_access
|
||||||
curated: {curated}/collection_2
|
curated: {curated}/collection_2
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,9 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
import time
|
||||||
import yaml
|
import yaml
|
||||||
|
from itertools import count
|
||||||
|
|
||||||
from dump_things_service import (
|
from dump_things_service import (
|
||||||
HTTP_200_OK,
|
HTTP_200_OK,
|
||||||
|
|
@ -147,3 +149,41 @@ def test_audit_backend(fastapi_client_simple):
|
||||||
assert values[i][0] == user_names[i]
|
assert values[i][0] == user_names[i]
|
||||||
assert values[i][1] == f'author_{i}@www.org'
|
assert values[i][1] == f'author_{i}@www.org'
|
||||||
assert yaml.safe_load(values[i][3]) == json_objects[i]
|
assert yaml.safe_load(values[i][3]) == json_objects[i]
|
||||||
|
|
||||||
|
|
||||||
|
def test_audit_backend_auto_flush(fastapi_client_simple):
|
||||||
|
test_client, _ = fastapi_client_simple
|
||||||
|
|
||||||
|
record_id = 'abc:audit-trailed'
|
||||||
|
names = 'Robert', 'Anton'
|
||||||
|
tokens = 'token_1_xxxxx', 'token_admin'
|
||||||
|
user_names = 'test_user_1_curated', 'test_admin'
|
||||||
|
json_objects = tuple(
|
||||||
|
{
|
||||||
|
'schema_type': 'abc:Person',
|
||||||
|
'pid': record_id,
|
||||||
|
'given_name': name,
|
||||||
|
}
|
||||||
|
for name in names
|
||||||
|
)
|
||||||
|
|
||||||
|
for i in range(2):
|
||||||
|
response = test_client.post(
|
||||||
|
f'/collection_1/curated/record/Person?author_id=author_{i}@www.org',
|
||||||
|
headers={'x-dumpthings-token': tokens[i]},
|
||||||
|
json=json_objects[i],
|
||||||
|
)
|
||||||
|
assert response.status_code == HTTP_200_OK
|
||||||
|
|
||||||
|
config_instance = get_config()
|
||||||
|
audit_backend = config_instance.audit_backends['collection_1'][0]
|
||||||
|
|
||||||
|
assert audit_backend.current_change_set, 'expected unpersisted changes in audit log'
|
||||||
|
|
||||||
|
for i in count():
|
||||||
|
if not audit_backend.current_change_set:
|
||||||
|
break
|
||||||
|
i += 1
|
||||||
|
if i == 10:
|
||||||
|
raise ValueError(f'auto flush did not trigger within 10 seconds')
|
||||||
|
time.sleep(1)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue