Add audit trail functionality for collections #198

Merged
cmo merged 17 commits from audit-trail into master 2026-03-17 09:03:05 +00:00
13 changed files with 531 additions and 3 deletions

View file

@ -4,6 +4,11 @@ jobs:
Test-all:
runs-on: ubuntu-latest
steps:
- name: Set up environment
run: |
git config --global user.email "test@example.org"
git config --global user.name "CI Tester"
- name: Check out repository code
uses: actions/checkout@v4

View file

@ -1,4 +1,15 @@
# 5.5.1 (2026-03-10)
# x.x.x ()
## New features
- Support for audit backends was added to `dump-things-service`. Currently there
is one audit backend type: `gitaudit`. The audit backend stores provenance information
about records, i.e. who changed what at which time.
- The new tool `dump-things-report-gitaudit` reports audit information for
individual PIDs, i.e., timestamps, user-id, associated diffs, and the
resulting record.
## Improvements

View file

@ -502,6 +502,38 @@ collections:
```
#### Audit Backends
The service supports audit-logs of changes that are made via the curation interface.
Audit logs are configured per collection via the key `audit-backends`.
The key expects a list of audit-backend configurations.
Currently the only supported audit-backend type is `gitaudit`:
```yaml
type: collections
version: 1
collections:
collection_1:
...
audit-backends:
- type: gitaudit
path: <path to directory>
...
```
Here `<path to directory>` must be a path to a directory.
If the directory does not exist, it will be created.
If the directory exists, it should contain a bare git repository.
The commands `dump-things-report-gitaudit <path to directory> <PID>` can be used to show the audit-log for the given `PID`.
Each log entry contains the timestamp of the change, the ID of the curator that posted the change, a diff of the change, and the resulting record.
Note: currently the user ID of the curator will be stored as author in the audit-log entries.
The "original" author of a change is usually identified in the `annotations`-field of the record.
### Endpoints
Most endpoints require a *collection*. These correspond to the names of the "data record collection"-directories (for example `myschema-v3-fmta` in [Dump Things Service](https://concepts.datalad.org/dump-things-storage-v0/)) in the stores.

View file

@ -0,0 +1,60 @@
from abc import (
ABCMeta,
abstractmethod,
)
class AuditBackend(metaclass=ABCMeta):
@abstractmethod
def add_record(
self,
record_id: str,
record: dict,
user_id: str,
) -> None:
"""Add information about a new record version to the audit log
:param record_id: the ID of the record (this is usually `record['pid']`.
:param record: the content of the new record (will be stored in YAML format).
:param user_id: the ID of the user who adds the record.
:return: A dictionary where the keys are time stamps of the changes,
the values are tuples containing the elements:
(user_id, diff, resulting_record), where user_id is the
`user_id` that was used in `add_record`, `resulting_record` is
the YAML-representation of `record` that was given to
`add_record`, and diff is path the transfers the previous
version of the record to the version provided in `record` (in
git-diff format).
"""
raise NotImplementedError
@abstractmethod
def flush(self):
"""Ensure that all audit-log entries are persisted on disk
After `flush()` is external tools should be able to pick up all
log-entries from the persisted data.
"""
raise NotImplementedError
@abstractmethod
def get_audit_log(
self,
record_id: str,
) -> dict:
"""Get the content of the audit log
All diffs and content are communicated in YAML format.
:param record_id: the ID of the record (as given in the parameter
`record_id` in the call to `add_record`).
:return: A dictionary where the keys are time stamps of the changes,
the values are tuples containing the elements:
(user_id, diff, resulting_record), where user_id is the
`user_id` that was used in `add_record`, `resulting_record` is
the YAML-representation of `record` that was given to
`add_record`, and diff is path the transfers the previous
version of the record to the version provided in `record` (in
git-diff format).
"""
raise NotImplementedError

View file

@ -0,0 +1,199 @@
"""A git-based audit backend
The backend minimizes commits by caching changes until an already
changed record is changed again. In this case all changes are
committed.
Changes are annotated with a time stamp and a user-id
"""
import hashlib
from datetime import datetime
from pathlib import Path
import yaml
from datalad_core.git_utils import apply_changeset
from datalad_core.repo import Repo
from datalad_core.runners import (
call_git,
CommandError,
)
from . import AuditBackend
class GitAuditBackend(AuditBackend):
def __init__(
self,
path: Path,
):
self.path = path
self.cache = {}
self.current_change_set = {}
self.repo = self._init_repo()
def add_record(
self,
record_id: str,
record: dict,
user_id: str,
) -> None:
location = self._get_location_for(record_id)
if self._has_pending_changes(location):
self._persist_pending_changes()
self._add_elements(location, user_id, record)
def flush(self):
if self.current_change_set:
self._persist_pending_changes()
def get_audit_log(
self,
record_id: str,
) -> dict:
self.flush()
# Get all commits that updated the log. Those will also have updated
# the records
changes = []
yaml_location, log_location = map(str, self._get_location_for(record_id)[1:])
commit_hashes = call_git(
['log', '--format=%H', '--', log_location],
cwd=self.path,
capture_output=True,
).decode().splitlines()
for commit_hash in commit_hashes:
log_diff_lines = call_git(
['show', '--format=%b', commit_hash, '--', log_location],
cwd=self.path,
capture_output=True,
).decode().splitlines()
# Get the log entry
log_entry = tuple(
filter(
lambda l: not l.startswith('+++') and l.startswith('+'),
log_diff_lines,
)
)[0][1:]
time_stamp, user_id = log_entry.split(' ', 1)
# Get the YAML diff
yaml_diff_lines = call_git(
['show', '--format=%b', commit_hash, '--', yaml_location],
cwd=self.path,
capture_output=True,
).decode().splitlines()
yaml_diff = '\n'.join(filter(lambda l: l != '', yaml_diff_lines)) + '\n'
# Get the YAML content
yaml_content = call_git(
['show', f'{commit_hash}:{yaml_location}'],
cwd=self.path,
capture_output=True,
).decode()
changes.append((time_stamp, user_id, yaml_diff, yaml_content))
changes.sort()
return {c[0]: c[1:] for c in changes}
def _add_elements(
self,
location: tuple[str, Path, Path],
user_id: str,
record: dict,
) -> bool:
existing_record = self._read_record_from_repo_path(location[1])
if existing_record != record:
self.current_change_set[location[1]] = yaml.dump(
data=record,
sort_keys=False,
allow_unicode=True,
default_flow_style=False,
)
self._add_log_entry(location[2], user_id)
return True
return False
def _add_log_entry(
self,
log_location: Path,
user_id: str,
) -> None:
time_stamp = datetime.now().isoformat()
log_content = self._read_from_repo_path(log_location).decode()
log_content += f'{time_stamp} {user_id}\n'
self.current_change_set[log_location] = log_content
def _read_from_repo_path(
self,
path: Path,
) -> bytes:
try:
return call_git(
['cat-file', '-p', f'master:{str(path)}'],
cwd=self.path,
capture_output=True,
)
except CommandError as e:
if e.returncode == 128:
return b''
raise
def _read_record_from_repo_path(
self,
path: Path,
):
return yaml.safe_load(self._read_from_repo_path(path))
def _has_pending_changes(
self,
location: tuple[str, Path, Path],
) -> bool:
log_pending = location[1] in self.current_change_set
record_pending = location[2] in self.current_change_set
if log_pending != record_pending:
msg = (
f'change status mismatch: changed: '
f'{location[1]} ({log_pending}), '
f'{location[2]} ({record_pending})'
)
raise SystemError(msg)
return log_pending
def _persist_pending_changes(self) -> None:
apply_changeset(
self.repo,
self.current_change_set,
message='persist changes',
)
self.current_change_set = {}
def _get_location_for(
self,
record_id: str,
) -> tuple[str, Path, Path]:
base = hashlib.sha1(record_id.encode()).hexdigest()
dir_1, dir_2, name = base[0:3], base[3:6], base[6:]
location_dir = Path(dir_1) / Path(dir_2)
return (
base,
location_dir / (base + '.yaml'),
location_dir / (base + '.log'),
)
def _init_repo(self) -> Repo:
if self.path.exists():
is_empty = len(tuple(Path(self.path).glob('**'))) == 1
else:
self.path.mkdir(parents=True)
is_empty = True
if is_empty:
call_git(['init', '--bare', str(self.path)], cwd=self.path)
self.repo = Repo(self.path)
apply_changeset(
self.repo,
{'README.txt': 'A git-based audit backend\n'},
message='add README.txt',
)
return self.repo

View file

@ -0,0 +1,103 @@
import subprocess
from pathlib import Path
from dump_things_service.audit.gitaudit import GitAuditBackend
def _get_git_log(path: Path) -> list[str]:
result = subprocess.run(
['git', '-C', str(path), 'log', '--oneline'],
capture_output=True,
check=True,
)
return result.stdout.decode().splitlines()
def _get_audit_log_lines(backend: GitAuditBackend, record_id: str) -> list[str]:
locations = backend._get_location_for(record_id)
return backend._read_from_repo_path(locations[2]).decode().splitlines()
def test_gitaudit_basic(tmp_path_factory):
tmp_path = tmp_path_factory.mktemp("gitaudit_backend")
backend = GitAuditBackend(tmp_path)
record_id = 'test_gitaudit_basic'
for index in range(4):
backend.add_record(
record_id=record_id,
record={'pid': record_id, 'content': index},
user_id=f'tester_{index}@example.com',
)
# Check that the log file has 4 entries
backend.flush()
log_lines = _get_audit_log_lines(backend, record_id)
assert len(log_lines) == 4
# Check that the commit log has 4 + 1 (from `README.txt`) entries
commit_log_lines = _get_git_log(tmp_path)
assert len(commit_log_lines) == 5
# Check that the changes are reported
changes = backend.get_audit_log(record_id)
assert len(changes) == 4
assert tuple(map(lambda e: e[0], changes.values())) == tuple((f'tester_{i}@example.com' for i in range(4)))
def test_gitaudit_identical_change(tmp_path_factory):
tmp_path = tmp_path_factory.mktemp("gitaudit_backend")
backend = GitAuditBackend(tmp_path)
record_id = 'test_gitaudit_idempotent'
backend.add_record(
record_id=record_id,
record={'pid': record_id},
user_id='tester@example.com',
)
backend.add_record(
record_id=record_id,
record={'pid': record_id},
user_id='tester@example.com',
)
# Check that there is only one entry in the audit log
log_lines = _get_audit_log_lines(backend, record_id)
assert len(log_lines) == 1
# Check that there are two entries in the commit history, one for the
# `README.txt`-file, one for the log entries.
commit_log_lines = _get_git_log(tmp_path)
assert len(commit_log_lines) == 2
# Check that the changes are reported
changes = backend.get_audit_log(record_id)
assert len(changes) == 1
def test_gitaudit_huge_log(tmp_path_factory):
tmp_path = tmp_path_factory.mktemp("gitaudit_backend")
backend = GitAuditBackend(tmp_path)
change_number = 2
record_number = 100
for j in range(change_number):
for i in range(record_number):
record_id = f'huge_{i}'
backend.add_record(
record_id=record_id,
record={'pid': record_id, 'content': f'j:{j}, i:{i}'},
user_id='tester@example.com',
)
# Check that the changes are reported
for i in range(record_number):
record_id = f'huge_{i}'
changes = backend.get_audit_log(record_id)
assert len(changes) == change_number

View file

@ -0,0 +1,47 @@
from __future__ import annotations
import json
import sys
from argparse import ArgumentParser
from pathlib import Path
from dump_things_service.audit.gitaudit import GitAuditBackend
parser = ArgumentParser(
prog='Report audit information for a PID',
description='Report the audit information that was stored for a specific '
'PID. For every change to a record the tool will report: '
'time stamp, user ID, diff, and the resulting record.',
)
parser.add_argument(
'audit_store',
help='The path to the gitaudit store',
)
parser.add_argument(
'pid',
help='The PID of the record for which audit information should be reported.',
)
def main():
arguments = parser.parse_args()
audit_backend = GitAuditBackend(Path(arguments.audit_store))
changes = audit_backend.get_audit_log(arguments.pid)
output = {
time_stamp: {
'user-id': change[0],
'diff': change[1],
'resulting-record': change[2],
}
for time_stamp, change in changes.items()
}
print(json.dumps(output, indent=2, ensure_ascii=False))
return 0
if __name__ == '__main__':
sys.exit(main())

View file

@ -27,6 +27,7 @@ from dump_things_service import (
HTTP_404_NOT_FOUND,
Format,
)
from dump_things_service.audit.gitaudit import GitAuditBackend
from dump_things_service.backends.record_dir import RecordDirStore
from dump_things_service.backends.schema_type_layer import SchemaTypeLayer
from dump_things_service.backends.sqlite import SQLiteBackend
@ -129,6 +130,11 @@ class ConfigAuthConfig(StrictModel):
type: Literal['config'] = 'config'
class GitAuditBackendConfig(StrictModel):
type: Literal['gitaudit']
path: Path
class TagConfig(StrictModel):
submitter_id_tag: str = 'http://purl.obolibrary.org/obo/NCIT_C54269'
submission_time_tag: str = 'http://semanticscience.org/resource/SIO_001083'
@ -143,6 +149,7 @@ class CollectionConfig(StrictModel):
submission_tags: TagConfig = TagConfig()
use_classes: list[str] = dataclasses.field(default_factory=list)
ignore_classes: list[str] = dataclasses.field(default_factory=list)
audit_backends: list[GitAuditBackendConfig] = dataclasses.field(default_factory=list)
class GlobalConfig(StrictModel):
@ -174,6 +181,7 @@ class InstanceConfig:
validators: dict = dataclasses.field(default_factory=dict)
use_classes: dict = dataclasses.field(default_factory=dict)
maintenance_mode: set = dataclasses.field(default_factory=set)
audit_backends: dict = dataclasses.field(default_factory=dict)
mode_mapping = {
TokenModes.READ_CURATED: TokenPermission(curated_read=True),
@ -446,6 +454,13 @@ def process_config_object(
# authentication routine.
instance_config.token_stores[collection_name] = {}
# Generate audit backends
instance_config.audit_backends[collection_name] = []
for audit_backend in collection_info.audit_backends:
instance_config.audit_backends[collection_name].append(
GitAuditBackend(audit_backend.path)
)
# Create validator for each collection
for collection_name, _ in config_object.collections.items():
instance_config.validators[collection_name] = FormatConverter(

View file

@ -374,3 +374,10 @@ async def store_curated_record(
class_name,
json_object,
)
for audit_backend in instance_config.audit_backends[collection]:
audit_backend.add_record(
record_id=pid,
record=json_object,
user_id=instance_config.tokens[collection][api_key]['user_id'],
)

View file

@ -42,6 +42,9 @@ collections:
submission_tags:
submitter_id_tag: oxo:NCIT_C54269
submission_time_tag: https://time
audit_backends:
- type: gitaudit
path: {{audit_store_path}}
collection_2:
default_token: basic_access
curated: {curated}/collection_2
@ -269,7 +272,10 @@ tokens:
@pytest.fixture(scope='session')
def dump_stores_simple(tmp_path_factory):
tmp_path = tmp_path_factory.mktemp('dump_store')
(tmp_path / config_file_name).write_text(global_config_text)
audit_store_path = tmp_path_factory.mktemp('audit_store')
final_config_text = global_config_text.format(audit_store_path=str(audit_store_path))
(tmp_path / config_file_name).write_text(final_config_text)
default_entries = {
f'collection_{i}': [('Person', pid, test_record)] for i in range(1, 9)
@ -289,7 +295,7 @@ def dump_stores_simple(tmp_path_factory):
default_entries['collection_dlflatsocial-2'] = [('Person', pid_trr, test_record_trr)]
create_store(
root_dir=tmp_path,
config=GlobalConfig(**yaml.safe_load(global_config_text)),
config=GlobalConfig(**yaml.safe_load(final_config_text)),
per_collection_info={
'collection_1': (str(schema_path), 'digest-md5'),
'collection_2': (str(schema_path), 'digest-md5-p3'),

View file

@ -1,11 +1,13 @@
from __future__ import annotations
import pytest
import yaml
from dump_things_service import (
HTTP_200_OK,
HTTP_404_NOT_FOUND,
)
from dump_things_service.config import get_config
delete_record = {
'schema_type': 'abc:Person',
@ -109,3 +111,38 @@ def test_curated_delete(fastapi_client_simple):
headers={'x-dumpthings-token': 'token_1_xxxxx'},
)
assert response.status_code == HTTP_404_NOT_FOUND
def test_audit_backend(fastapi_client_simple):
test_client, _ = fastapi_client_simple
record_id = 'abc:audit-trailed'
names = 'Frederick', 'Johny'
tokens = 'token_1_xxxxx', 'token_admin'
user_names = 'test_user_1_curated', 'test_admin'
json_objects = tuple(
{
'schema_type': 'abc:Person',
'pid': record_id,
'given_name': name,
}
for name in names
)
for i in range(2):
response = test_client.post(
'/collection_1/curated/record/Person',
headers={'x-dumpthings-token': tokens[i]},
json=json_objects[i],
)
assert response.status_code == HTTP_200_OK
config_instance = get_config()
audit_backend = config_instance.audit_backends['collection_1'][0]
changes = audit_backend.get_audit_log(record_id)
assert len(changes) == 2
values = tuple(changes.values())
for i in range(2):
assert values[i][0] == user_names[i]
assert yaml.safe_load(values[i][2]) == json_objects[i]

View file

@ -2,6 +2,10 @@
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.metadata]
# This is required for the git+https dependency
allow-direct-references = true
[project]
name = "dump-things-service"
dynamic = ["version"]
@ -27,6 +31,7 @@ classifiers = [
dependencies = [
"aiohttp",
"click",
"datalad-core @ git+https://hub.datalad.org/datalad/datalad-core@minilad",
"fastapi[standard]",
"fastapi-pagination",
"fsspec",
@ -50,6 +55,7 @@ dump-things-rebuild-index = "dump_things_service.commands.rebuild_index:main"
dump-things-copy-store = "dump_things_service.commands.copy_store:main"
dump-things-pid-check = "dump_things_service.commands.check_pids:main"
dump-things-create-merged-schema = "dump_things_service.commands.create_merged_schema:main"
dump-things-report-gitaudit = "dump_things_service.commands.report_gitaudit:main"
[tool.hatch.build.targets.wheel]
exclude = [