From f20daac10261bf407a3685c23ec0e87c4f054230 Mon Sep 17 00:00:00 2001 From: Christian Monch Date: Sun, 22 Mar 2026 10:30:26 +0100 Subject: [PATCH] add gitaudit auto_flush_timeout This commit adds the configuration `auto_flush_timeout` to the gitaudit-configuration. If `auto_flush_timeout` seconds have passed without adding a new audit log entry and there is a non-empty changeset, i.e., the changeset was last touched more than `auto_flush_timeout` seconds ago, the changeset will be persisted. This results in a commit. For example, if `auto_flush_timeout` is 60, but every 50 seconds a log-entry for a new record is added to the gitaudit-instance, the automated persisting of the changeset will never be triggered (the changeset might still be persisted, depending on the content of the changes). If, on the other hand, a single change was added and no other change is added in the next `auto_flush_timeout` seconds, the single change will be persisted, resulting in a commit with a single change. --- CHANGELOG.md | 11 +++ README.md | 5 +- dump_things_service/audit/gitaudit.py | 81 ++++++++++++++++++++--- dump_things_service/config.py | 3 +- dump_things_service/tests/fixtures.py | 1 + dump_things_service/tests/test_curated.py | 40 +++++++++++ 6 files changed, 130 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ebcaa4d..c60cc75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +# 5.6.1 (2026-03-20) + +## Bugfixes + +- Ensure that pending gitaudit log entries are always persisted after + a defined *timeout*. In version 5.6.0 pending changes could have been + cached until the server is shutdown, and only be written then. + This change ensures that `dump-things-gitaudit-report` will report + all changes after *timeout* seconds. + + # 5.6.0 (2026-03-20) ## New features diff --git a/README.md b/README.md index dbac399..50559f9 100644 --- a/README.md +++ b/README.md @@ -520,13 +520,16 @@ collections: audit-backends: - type: gitaudit path: - + auto_flush_timeout: ... ``` Here `` must be a path to a directory. If the directory does not exist, it will be created. If the directory exists, it should contain a bare git repository. +If `auto_flush_timeout` seconds have passed without adding new audit log entries, the current changeset is persisted, resulting in a commit. +The default value for `auto_flush_timeout` is 60 seconds. The value must not be lower than `1`. + The command `dump-things-gitaudit-report ` can be used to show the audit-log for all PIDs that match the given `PID`-pattern (pattern are in python `re`-module syntax, i.e. use `'.*'` to report changes for all PIDs). Each log entry contains the timestamp of the change, the ID of the curator that posted the change, a diff of the change, and the resulting record. diff --git a/dump_things_service/audit/gitaudit.py b/dump_things_service/audit/gitaudit.py index 94e2ea5..b3afaed 100644 --- a/dump_things_service/audit/gitaudit.py +++ b/dump_things_service/audit/gitaudit.py @@ -6,11 +6,18 @@ committed. Changes are annotated with a time stamp and a user-id """ +from __future__ import annotations + import hashlib import json import re +import time from datetime import datetime from pathlib import Path +from threading import ( + Lock, + Thread, +) import yaml from datalad_core.git_utils import apply_changeset @@ -26,44 +33,93 @@ from . import AuditBackend index_file_name = 'gitaudit_index.log' +class FlushingThread(Thread): + def __init__( + self, + backend: GitAuditBackend, + auto_flush_timeout: int, + ): + super().__init__() + self.auto_flush_timeout = auto_flush_timeout + self.backend = backend + self.exit_requested = False + self.daemon = True + + def request_exit(self): + self.exit_requested = True + + def run(self): + while not self.exit_requested: + time.sleep(1) + if time.time() - self.backend.last_flush_time > self.auto_flush_timeout: + self.backend.flush() + + class GitAuditBackend(AuditBackend): def __init__( self, path: Path, + auto_flush_timeout: int = 60, ): self.path = path self.index_path = None self.cached_index_entries = [] self.current_change_set = {} + self.lock = Lock() + self.last_flush_time = 0 + if auto_flush_timeout < 1: + raise ValueError('auto_flush_timeout must be greater or equal to 1') + self.flushing_thread = FlushingThread(self, auto_flush_timeout) + self.flushing_thread.start() self._init_repo() + def __del__(self): + self.flush() + if self.flushing_thread: + self.flushing_thread.request_exit() + self.flushing_thread.join() + self.flushing_thread = None + def add_record( self, record: dict, committer_id: str, author_id: str | None = None, ) -> None: - author_id = committer_id if author_id is None else author_id - record_id = record['pid'] - location = self._get_location_for(record_id) - if self._has_pending_changes(location): - self._persist_pending_changes() - self._add_elements(record_id, location, committer_id, author_id, record) + with self.lock: + author_id = committer_id if author_id is None else author_id + record_id = record['pid'] + location = self._get_location_for(record_id) + if self._has_pending_changes(location): + self._persist_pending_changes() + self._add_elements(record_id, location, committer_id, author_id, record) def flush(self): + with self.lock: + self._locked_flush() + + def _locked_flush(self): if self.current_change_set: self._persist_pending_changes() if self.cached_index_entries: with self.index_path.open('at') as f: f.write('\n'.join(self.cached_index_entries) + '\n') self.cached_index_entries = [] + self.last_flush_time = time.time() def get_audit_log( self, record_id: str, ) -> dict: - self.flush() + with self.lock: + return self._locked_get_audit_log(record_id) + + def _locked_get_audit_log( + self, + record_id: str, + ) -> dict: + self._locked_flush() # Get all commits that updated the log. Those will also have updated # the records @@ -120,7 +176,14 @@ class GitAuditBackend(AuditBackend): self, record_id_pattern: str, ) -> dict: - self.flush() + with self.lock: + return self._locked_get_audit_logs(record_id_pattern) + + def _locked_get_audit_logs( + self, + record_id_pattern: str, + ) -> dict: + self._locked_flush() matcher = re.compile(record_id_pattern) matching_ids = tuple( filter( @@ -129,7 +192,7 @@ class GitAuditBackend(AuditBackend): ) ) return { - record_id: self.get_audit_log(record_id) + record_id: self._locked_get_audit_log(record_id) for record_id in sorted(matching_ids) } diff --git a/dump_things_service/config.py b/dump_things_service/config.py index b47b236..aa373ca 100644 --- a/dump_things_service/config.py +++ b/dump_things_service/config.py @@ -133,6 +133,7 @@ class ConfigAuthConfig(StrictModel): class GitAuditBackendConfig(StrictModel): type: Literal['gitaudit'] path: Path + auto_flush_timeout: int = 60 class TagConfig(StrictModel): @@ -458,7 +459,7 @@ def process_config_object( instance_config.audit_backends[collection_name] = [] for audit_backend in collection_info.audit_backends: instance_config.audit_backends[collection_name].append( - GitAuditBackend(audit_backend.path) + GitAuditBackend(audit_backend.path, audit_backend.auto_flush_timeout) ) # Create validator for each collection diff --git a/dump_things_service/tests/fixtures.py b/dump_things_service/tests/fixtures.py index 5eb4cb7..491553e 100644 --- a/dump_things_service/tests/fixtures.py +++ b/dump_things_service/tests/fixtures.py @@ -45,6 +45,7 @@ collections: audit_backends: - type: gitaudit path: {{audit_store_path}} + auto_flush_timeout: 2 collection_2: default_token: basic_access curated: {curated}/collection_2 diff --git a/dump_things_service/tests/test_curated.py b/dump_things_service/tests/test_curated.py index e6a99a3..7eebee2 100644 --- a/dump_things_service/tests/test_curated.py +++ b/dump_things_service/tests/test_curated.py @@ -1,7 +1,9 @@ from __future__ import annotations import pytest +import time import yaml +from itertools import count from dump_things_service import ( HTTP_200_OK, @@ -147,3 +149,41 @@ def test_audit_backend(fastapi_client_simple): assert values[i][0] == user_names[i] assert values[i][1] == f'author_{i}@www.org' assert yaml.safe_load(values[i][3]) == json_objects[i] + + +def test_audit_backend_auto_flush(fastapi_client_simple): + test_client, _ = fastapi_client_simple + + record_id = 'abc:audit-trailed' + names = 'Robert', 'Anton' + tokens = 'token_1_xxxxx', 'token_admin' + user_names = 'test_user_1_curated', 'test_admin' + json_objects = tuple( + { + 'schema_type': 'abc:Person', + 'pid': record_id, + 'given_name': name, + } + for name in names + ) + + for i in range(2): + response = test_client.post( + f'/collection_1/curated/record/Person?author_id=author_{i}@www.org', + headers={'x-dumpthings-token': tokens[i]}, + json=json_objects[i], + ) + assert response.status_code == HTTP_200_OK + + config_instance = get_config() + audit_backend = config_instance.audit_backends['collection_1'][0] + + assert audit_backend.current_change_set, 'expected unpersisted changes in audit log' + + for i in count(): + if not audit_backend.current_change_set: + break + i += 1 + if i == 10: + raise ValueError(f'auto flush did not trigger within 10 seconds') + time.sleep(1) -- 2.52.0