dump-things-pyclient/dump_things_pyclient/tests/test_auto_curate.py
Christian Monch 23714d4f8f
All checks were successful
Test execution / Test-all (push) Successful in 37s
add tests for --only-if-modifying in auto-curate
2026-03-31 17:05:16 +02:00

502 lines
17 KiB
Python

import json
import random
import subprocess
from itertools import chain
import pytest
from click.testing import CliRunner
from dump_things_pyclient.commands.dtc import cli
from dump_things_pyclient.tests.common import (
add_unique_records,
read_records_from_store,
)
prefix = 'https://www.example.com/ac_e2e_test/'
def _create_unique_records(offset: int, count: int) -> dict[int, dict]:
while True:
pids = tuple(set(random.randrange(offset, offset + 10000) for _ in range(count)))
if len(pids) == count:
break
return {
# NOTE: the order of keys is relevant because there are JSON-string
# comparisons in the change-set tests below.
pid: {
'schema_type': 'test:Person',
'pid': prefix + f'person_{pid}',
'family_name': f'grieg_{pid}',
'given_name': f'erwin_{pid}',
}
for pid in pids
}
def test_auto_curate_basic_end_to_end(dump_things_service):
port, store = dump_things_service
new_records = tuple(_create_unique_records(10000, 5).values())
# Add records to inbox
runner = CliRunner()
result = runner.invoke(
cli,
['--token=user_1', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
input='\n'.join(
json.dumps(record, ensure_ascii=False) for record in new_records
) + '\n'
)
assert result.exit_code == 0, 'dtc post-records failed'
# Ensure that the records do not yet exist in the curated area
stored_curated_records = tuple(
map(
lambda e: e[2],
read_records_from_store(store, class_name='Person', remove_keys=['annotations'])
)
)
for record in new_records:
assert record not in stored_curated_records, 'record already exists, possibly a random number collision'
# Perform auto-curation
result = runner.invoke(
cli,
['--token=token-curator', 'auto-curate', '-i', 'test_user_1', f'http://127.0.0.1:{port}', 'collection_1'],
)
assert result.exit_code == 0, 'dtc auto-curate failed'
# Check that the inbox is empty
stored_inbox_records = tuple(
map(
lambda e: e[2],
read_records_from_store(store, incoming='test_user_1')
)
)
assert stored_inbox_records == tuple(), 'Inbox not clean after auto-curation'
# Check that the records are in the curated area
stored_curated_records = tuple(
map(
lambda e: e[2],
read_records_from_store(store, class_name='Person', remove_keys=['annotations'])
)
)
for record in new_records:
assert record in stored_curated_records
def test_auto_curate_create_change_set_end_to_end(dump_things_service, tmp_path_factory):
port, store = dump_things_service
change_set_dir = tmp_path_factory.mktemp('create_change_set')
new_records = _create_unique_records(20000, 5)
new_curated_records = _create_unique_records(30000, 5)
# Add new curated records to inbox and auto-curate them to move them to
# the curated area.
runner = CliRunner()
result = runner.invoke(
cli,
['--token=user_1', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
input='\n'.join(
json.dumps(record, ensure_ascii=False) for record in new_curated_records.values()
) + '\n'
)
assert result.exit_code == 0, 'dtc post-records failed'
# Perform auto-curation
result = runner.invoke(
cli,
['--token=token-curator', 'auto-curate', '-i', 'test_user_1', f'http://127.0.0.1:{port}', 'collection_1'],
)
assert result.exit_code == 0, 'dtc auto-curate failed'
# Modify the record that were auto-curated and upload those together with
# newly created records to the inbox.
modified_curated_records = {
# NOTE: the order of keys is relevant because there are JSON-string
# comparisons in the change-set tests below.
pid: {
'schema_type': 'test:Person',
'pid': record['pid'],
'family_name': record['family_name'],
'given_name': record['given_name'].replace('erwin', 'edvard'),
}
for pid, record in new_curated_records.items()
}
# Upload the modified (already curated) records and the new records to the
# inbox.
result = runner.invoke(
cli,
['--token=user_1', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
input='\n'.join(
json.dumps(record, ensure_ascii=False)
for record in chain(modified_curated_records.values(), new_records.values())
) + '\n'
)
assert result.exit_code == 0, 'dtc post-records failed'
# Check that there are records in the incoming area of 'test_user_1'
incoming_records = tuple(read_records_from_store(store=store, incoming='test_user_1'))
assert incoming_records != tuple()
# Create a change set
result = runner.invoke(
cli,
[
'--token=token-curator',
'auto-curate',
'--create-change-set', str(change_set_dir),
'-i', 'test_user_1',
f'http://127.0.0.1:{port}', 'collection_1',
],
)
assert result.exit_code == 0, 'dtc auto-curate --create-change-set failed'
# Check the number of modified records in the change set
result = subprocess.run(
['git', 'status', '-s',],
cwd=str(change_set_dir),
check=True,
capture_output=True,
)
lines = [
line.strip()
for line in result.stdout.decode().splitlines()
if 'annotations' not in line
]
assert len(lines) == len(new_records) + len(modified_curated_records), f'unexpected number of modified records: {len(lines)}'
assert all(l.startswith('M records/test_user_1') for l in lines), f'unexpected status for modified records: {lines}'
# Check for expected diff content
result = subprocess.run(
['git', 'diff', '-p',],
cwd=str(change_set_dir),
check=True,
capture_output=True,
)
lines = [line.strip() for line in result.stdout.decode().splitlines()]
diffing_pids = [
int(line[13:].split()[0][-5:])
for line in lines if line.startswith('diff --git')
]
assert all(map(lambda pid: pid in diffing_pids, new_records))
assert all(map(lambda pid: pid in diffing_pids, modified_curated_records))
# Check that annotations are stored in the change set
annotations = {
int(p.name[-5:]): json.loads(p.read_text())
for p in (change_set_dir / 'annotations').glob('test_user_1/*')
}
assert len(annotations) == len(modified_curated_records) + len(new_curated_records)
# Check that all inboxes are empty
incoming_records = tuple(read_records_from_store(store=store, incoming='test_user_1'))
assert incoming_records == tuple()
def test_auto_curate_post_change_set_end_to_end(dump_things_service, tmp_path_factory):
port, store = dump_things_service
change_set_dir = tmp_path_factory.mktemp('post_change_set')
new_records = _create_unique_records(40000, 5)
# Add new records to inbox of 'test_user_1'.
runner = CliRunner()
result = runner.invoke(
cli,
['--token=user_1', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
input='\n'.join(
json.dumps(record, ensure_ascii=False) for record in new_records.values()
) + '\n'
)
assert result.exit_code == 0, 'dtc post-records failed'
# Create a change set with the records from 'test_user_1's' inbox
result = runner.invoke(
cli,
[
'--token=token-curator',
'auto-curate',
'--create-change-set', str(change_set_dir),
'-i', 'test_user_1',
f'http://127.0.0.1:{port}', 'collection_1',
],
)
assert result.exit_code == 0, 'dtc auto-curate --create-change-set failed'
# Post the changeset without annotations
result = runner.invoke(
cli,
[
'--token=token-curator',
'auto-curate',
'--post-change-set', str(change_set_dir),
f'http://127.0.0.1:{port}', 'collection_1',
],
)
assert result.exit_code == 0, 'dtc auto-curate --post-change-set failed'
# Check that new records have been posted without annotations
curated_records = tuple(
map(
lambda e: e[2],
read_records_from_store(
store,
class_name='Person',
)
)
)
for record in new_records.values():
assert record in curated_records
for record in curated_records:
try:
record_pid = int(record['pid'][-5:])
if record_pid in new_records:
assert 'annotations' not in record, f'unexpected annotations in {record}'
except ValueError:
continue
# Post the changeset with annotations
result = runner.invoke(
cli,
[
'--token=token-curator',
'auto-curate',
'--post-change-set', str(change_set_dir),
'--add-annotations',
f'http://127.0.0.1:{port}', 'collection_1',
],
)
assert result.exit_code == 0, 'dtc auto-curate --post-change-set --add-annotations failed'
# Check that new records have been posted with annotations
curated_records = tuple(
map(
lambda e: e[2],
read_records_from_store(
store,
class_name='Person',
)
)
)
# Check that all record content has been posted
cleaned_curated_records = tuple(
{
'schema_type': r['schema_type'],
'pid': r['pid'],
'family_name': r['family_name'],
'given_name': r['given_name'],
}
for r in curated_records
)
for record in new_records.values():
assert record in cleaned_curated_records
# Check that annotations were posted.
annotations = {
p.name.replace('-_', '/').replace('--', '-'): json.loads(p.read_text())
for p in change_set_dir.glob('annotations/test_user_1/*')
}
for record in curated_records:
try:
record_pid = int(record['pid'][-5:])
if record_pid in new_records:
assert 'annotations' in record, f'missing annotations in {record}'
assert record['annotations'] == annotations[record['pid']]
except ValueError:
continue
def test_auto_curate_create_changeset_opt_in_end_to_end(dump_things_service, tmp_path_factory):
port, store = dump_things_service
change_set_dir_both = tmp_path_factory.mktemp('create_change_set_both')
change_set_dir_single = tmp_path_factory.mktemp('create_change_set_single')
new_curated_records = _create_unique_records(30000, 5)
# Add new curated records to inbox and auto-curate them to move them to
# the curated area.
runner = CliRunner()
for path, opt_in, expected_paths in (
(change_set_dir_single, None, {'test_user_1', 'test_user_2'}),
(change_set_dir_both, 'test_user_1', {'test_user_1'}),
):
for token in ('user_1', 'user_2'):
result = runner.invoke(
cli,
[f'--token={token}', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
input='\n'.join(
json.dumps(record, ensure_ascii=False) for record in new_curated_records.values()
) + '\n'
)
assert result.exit_code == 0, f'dtc post-records with token {token} failed'
# Create a change set
result = runner.invoke(
cli,
[
'--token=token-curator', 'auto-curate',
] + (
['-i', opt_in] if opt_in else []
) + [
'--create-change-set', str(path),
f'http://127.0.0.1:{port}', 'collection_1',
],
)
# Check that the change set contains the expected directories.
change_set_paths = set(
map(
lambda x: x.name,
path.glob('records/*')
)
)
assert change_set_paths == expected_paths
@pytest.mark.parametrize('create_changeset', [True, False])
def test_keep_inboxes(dump_things_service, tmp_path_factory, create_changeset):
port, store = dump_things_service
if create_changeset:
change_set_dir = tmp_path_factory.mktemp('create_changeset_keep_inboxes')
unique_records = add_unique_records(
port,
'collection_1',
5,
f'test_keep_inboxes_{create_changeset}',
token='user_1',
)
runner = CliRunner()
result = runner.invoke(
cli,
[
'--token=token-curator',
'auto-curate',
] + (['--create-change-set', str(change_set_dir)] if create_changeset else []) + [
'--keep-inboxes',
f'http://127.0.0.1:{port}', 'collection_1',
],
)
assert result.exit_code == 0, 'auto-curate --keep-inboxes failed'
cleaned_incoming_records = [
{k: v for k, v in record_info[2].items() if k not in ('annotations',)}
for record_info in read_records_from_store(
store,
collection='collection_1',
incoming='test_user_1',
class_name='Person',
)
]
for record in unique_records.values():
assert record in cleaned_incoming_records
def test_auto_curate_if_changes(dump_things_service, monkeypatch):
from dump_things_pyclient.commands.dtc_plugins.auto_curate import console
port, store = dump_things_service
print_calls = []
monkeypatch.setattr(
console,
'print',
lambda *args: print_calls.extend(args),
)
existing_record = {
"pid": f"test:auto_curate_if_changes",
"given_name": f"markus",
'schema_type': 'test:Person',
'annotations': {
'https://submitter.example.com': 'submitter_1',
'https://counter.example.com': '1',
},
}
new_record = {
k: v for k, v in existing_record.items() if k not in ('annotations',)
}
new_record['annotations'] = {
'https://submitter.example.com': 'submitter_2',
'https://counter.example.com': '2',
}
runner = CliRunner()
# Post the existing record directly into the curated area
result = runner.invoke(
cli,
[
'--token=token-curator',
'post-records',
'--curated',
f'http://127.0.0.1:{port}', 'collection_1', '*',
],
input=json.dumps(existing_record, ensure_ascii=False)
)
assert result.exit_code == 0
print_calls = []
# Post the new record to the users inbox
result = runner.invoke(
cli,
[
'--token=user_1',
'post-records',
f'http://127.0.0.1:{port}', 'collection_1', '*',
],
input=json.dumps(new_record, ensure_ascii=False)
)
assert result.exit_code == 0
print_calls = []
# Try to auto-curate the new record with `--only-if-modifying`, ignoring
# `annotations`. This should not post the record to the curated area, but
# emit a message that the record was not posted
result = runner.invoke(
cli,
[
'--token=token-curator',
'auto-curate',
'--include', 'test_user_1',
'--only-if-modifying',
'--jsonpath-spec', 'annotations',
'--keep-inboxes',
f'http://127.0.0.1:{port}', 'collection_1',
],
input=json.dumps(new_record, ensure_ascii=False)
)
assert result.exit_code == 0
assert 'skipping writing of record [green]test:auto_curate_if_changes[/green] because a matching record already exists' in print_calls
print_calls = []
# Try to post the new record with `--only-if-modifying`, that should post
# the record to curated, because the annotations are different from the
# existing record.
result = runner.invoke(
cli,
[
'--token=token-curator',
'auto-curate',
'--include', 'test_user_1',
'--only-if-modifying',
'--keep-inboxes',
f'http://127.0.0.1:{port}', 'collection_1',
],
input=json.dumps(new_record, ensure_ascii=False)
)
assert result.exit_code == 0
assert 'skipping writing of record [green]test:auto_curate_if_changes[/green] because a matching record already exists' not in print_calls