Add option to skip posting of records to curated area, if a "similar" record already exists #37

Merged
cmo merged 7 commits from post-only-changed into master 2026-03-31 15:08:13 +00:00
9 changed files with 406 additions and 2 deletions

View file

@ -24,6 +24,7 @@ from ...communicate import (
incoming_read_labels, incoming_read_labels,
incoming_read_records, incoming_read_records,
) )
from .common.record_comparer import RecordComparer
subcommand_name = 'auto-curate' subcommand_name = 'auto-curate'
@ -80,7 +81,9 @@ stl_info = False
help='read the change set at CHANGE_SET_DIR and post it to the curated area of the given destination server and collection. ' help='read the change set at CHANGE_SET_DIR and post it to the curated area of the given destination server and collection. '
'If `--add-annotations` is provided in addition, the annotations that were recorded in the records in the incoming areas ' 'If `--add-annotations` is provided in addition, the annotations that were recorded in the records in the incoming areas '
'are added to the respective records before posting them to the destination (--post-change-set and ' 'are added to the respective records before posting them to the destination (--post-change-set and '
'--create-change-set are mutually exclusive)', '--create-change-set are mutually exclusive). If `--only-if-modifying` is provided, a record from the change '
'will only be posted, if an "identical" record is not already in the destination. Here "identical" means that the '
'records are identical if the attributed defined in `--jsonpath-spec` are ignored.',
type=click.Path( type=click.Path(
exists=True, exists=True,
file_okay=False, file_okay=False,
@ -138,6 +141,29 @@ stl_info = False
default=False, default=False,
is_flag=True, is_flag=True,
) )
@click.option(
'--only-if-modifying',
help='if provided a record will only be posted, if it does not yet exist '
'in the destination, or if the posted record modifies the existing '
'record (to ignore certain record entries, for example, `annotations`, '
'when comparing records, use the option --jsonpath-spec). NOTE: even if '
'a record is not posted, it will be removed from its inbox (unless '
'`--keep-inboxes` is provided).',
default=False,
is_flag=True,
)
@click.option(
'--jsonpath-spec',
metavar='IGNORE_SPEC',
help='a jsonpath-expression that defines record-attributes, that should be '
'ignored when checking for existing records. Every element that matches '
'the expression will be ignored when comparing records (for a '
'description of the specification syntax check the documentation of '
'https://pypi.org/project/jsonpath-ng/). For example, to ignore '
'`annotations` use `--jsonpath_spec "annotations"`.',
default=None,
is_flag=False,
)
@click.option( @click.option(
'--dry-run', '-d', '--dry-run', '-d',
help='if provided, do not alter any data, instead print what would be done', help='if provided, do not alter any data, instead print what would be done',
@ -161,6 +187,8 @@ def cli(
include, include,
list_labels, list_labels,
list_records, list_records,
only_if_modifying,
jsonpath_spec,
dry_run, dry_run,
): ):
"""Automatically move records from the incoming areas of the collection """Automatically move records from the incoming areas of the collection
@ -197,6 +225,8 @@ def cli(
include, include,
list_labels, list_labels,
list_records, list_records,
only_if_modifying,
jsonpath_spec,
dry_run, dry_run,
) )
) )
@ -224,6 +254,8 @@ def auto_curate(
include, include,
list_labels, list_labels,
list_records, list_records,
only_if_modifying,
jsonpath_spec,
dry_run, dry_run,
): ):
curator_token = obj curator_token = obj
@ -384,6 +416,8 @@ def auto_curate(
author_id=author_id, author_id=author_id,
pid=pid, pid=pid,
add_annotations=add_annotations, add_annotations=add_annotations,
only_if_modifying=only_if_modifying,
jsonpath_spec=jsonpath_spec,
dry_run=dry_run, dry_run=dry_run,
session=session, session=session,
) )
@ -403,6 +437,8 @@ def auto_curate(
author_id=author_id, author_id=author_id,
pid=pid, pid=pid,
keep_inboxes=keep_inboxes, keep_inboxes=keep_inboxes,
only_if_modifying=only_if_modifying,
jsonpath_spec=jsonpath_spec,
dry_run=dry_run, dry_run=dry_run,
session=session, session=session,
) )
@ -429,9 +465,19 @@ def _curate_records(
author_id: str | None, author_id: str | None,
pid: str | None, pid: str | None,
keep_inboxes: bool, keep_inboxes: bool,
only_if_modifying: bool,
jsonpath_spec: str | None,
dry_run: bool, dry_run: bool,
session: Session, session: Session,
) -> int: ) -> int:
record_comparer = None
if only_if_modifying:
record_comparer = RecordComparer(jsonpath_spec=jsonpath_spec)
else:
if jsonpath_spec:
console.print('[yellow]Warning[/yellow]: ignoring --jsonpath-spec because --only-if-modifying was not provided')
for record, _, _, _, _ in source: for record, _, _, _, _ in source:
if pid: if pid:
if record['pid'] not in pid: if record['pid'] not in pid:
@ -445,6 +491,19 @@ def _curate_records(
console.print(f'[yellow]Warning[/yellow]: could not determine class in record [yellow]{record["pid"]}[/yellow], ignoring it.') console.print(f'[yellow]Warning[/yellow]: could not determine class in record [yellow]{record["pid"]}[/yellow], ignoring it.')
continue continue
if record_comparer:
existing_record = curated_read_record_with_pid(
service_url=service_url,
collection=collection,
pid=record['pid'],
token=curator_token,
session=session,
)
if existing_record:
if record_comparer.is_equal(existing_record, record):
console.print(f'skipping writing of record [green]{record["pid"]}[/green] because a matching record already exists')
continue
if dry_run: if dry_run:
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to collection "{destination_collection}" on "{destination_service_url}"') console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to collection "{destination_collection}" on "{destination_service_url}"')
else: else:
@ -654,10 +713,20 @@ def _post_change_set(
author_id: str | None, author_id: str | None,
pid: str | None, pid: str | None,
add_annotations: bool, add_annotations: bool,
only_if_modifying: bool,
jsonpath_spec: str | None,
dry_run: bool, dry_run: bool,
session: Session, session: Session,
): ):
pid_file_name = _pid_to_filename(pid) if pid else None pid_file_name = _pid_to_filename(pid) if pid else None
record_comparer = None
if only_if_modifying:
record_comparer = RecordComparer(jsonpath_spec=jsonpath_spec)
else:
if jsonpath_spec:
console.print('[yellow]Warning[/yellow]: ignoring --jsonpath-spec because --only-if-modifying was not provided')
for file_name in source: for file_name in source:
if pid_file_name and pid_file_name != file_name: if pid_file_name and pid_file_name != file_name:
@ -685,6 +754,19 @@ def _post_change_set(
console.print(f'[yellow]Warning[/yellow]: could not determine class of record [yellow]{record["pid"]}[/yellow], ignoring it.') console.print(f'[yellow]Warning[/yellow]: could not determine class of record [yellow]{record["pid"]}[/yellow], ignoring it.')
continue continue
if record_comparer:
existing_record = curated_read_record_with_pid(
service_url=destination_service_url,
collection=destination_collection,
pid=record['pid'],
token=destination_token,
session=session,
)
if existing_record:
if record_comparer.is_equal(existing_record, record):
console.print(f'skipping writing of record [green]{record["pid"]}[/green] because a matching record already exists')
continue
if dry_run: if dry_run:
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to collection "{destination_collection}" on "{destination_service_url}"') console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to collection "{destination_collection}" on "{destination_service_url}"')
continue continue

View file

@ -0,0 +1,54 @@
from __future__ import annotations
from jsonpath_ng import (
parse,
jsonpath,
)
JSON = dict[str, 'JSON'] | list['JSON'] | int | str | float | bool | None
class RecordComparer:
"""Compare dictionaries, ignoring specified elements
`RecordComparer` compares two dictionaries, ignoring the elements that
are specified by `jsonpath_spec`.
The comparator uses `jsonpath-ng` (`https://pypi.org/project/jsonpath-ng/`).
All elements that match the expression in `jsonpath_spec` will be removed
before comparing two records (the specification language is defined in the
jsonpath-ng documentation).
Given the dictionary `r = {'key_1': {'key_1_1': 11, 'key_1_2': 22}}`, the
specification `'key_1.key_1_2'` matches `r['key_1']['key_1_2']`. This
entry will be ignored during comparison. Therefore, the two records:
`r1 = {'key_1': {'key_1_1': 11, 'key_1_2': 22}}`
`r2 = {'key_1': {'key_1_1': 11, 'key_1_2': 44}}`
with `jsonpath_spec`: `key_1.key_1_2`, `r1` and `r2` are considered to
be equal. For example, to ignore `annotations` when comparing two
dictionaries, `jsonpath_spec` should be `'annotations'`.
"""
def __init__(
self,
*,
jsonpath_spec: str | None = None,
):
self.expr = None if jsonpath_spec is None else parse(jsonpath_spec)
def is_equal(
self,
record_a: dict,
record_b: dict,
) -> bool:
if self.expr:
return self.expr.filter(
lambda _: True,
record_a,
) == self.expr.filter(
lambda _: True,
record_b,
)
return record_a == record_b

View file

@ -0,0 +1,28 @@
from ..record_comparer import RecordComparer
def test_example():
jsonpath_spec = 'key_1.key_1_2'
comparer = RecordComparer(jsonpath_spec=jsonpath_spec)
r1 = {'key_1': {'key_1_1': 11, 'key_1_2': 22}}
r2 = {'key_1': {'key_1_1': 11, 'key_1_2': 44}}
assert comparer.is_equal(r1, r2) is True, f'Unexpected non-equal result for {r1!r} =({jsonpath_spec})= {r2!r} wi'
def test_index():
jsonpath_spec = 'a.[1].b'
comparer = RecordComparer(jsonpath_spec=jsonpath_spec)
assert comparer.is_equal(
{'a': [{'b': 1, 'x': 2}, {'b': 3, 'y': 4}]},
{'a': [{'b': 1, 'x': 2}, {'y': 4}]}
)
def test_wildcard_index():
jsonpath_spec = 'a.[*].b'
comparer = RecordComparer(jsonpath_spec=jsonpath_spec)
assert comparer.is_equal(
{'a': [{'b': 1, 'x': 2}, {'b': 3, 'y': 4}]},
{'a': [{'x': 2}, {'y': 4}]}
)

View file

@ -11,9 +11,10 @@ from ...communicate import (
HTTPError, HTTPError,
curated_write_record, curated_write_record,
collection_write_record, collection_write_record,
get_session, get_session, curated_read_record_with_pid, incoming_read_record_with_pid,
) )
from .common.prefix import de_prefix from .common.prefix import de_prefix
from .common.record_comparer import RecordComparer
subcommand_name = 'post-records' subcommand_name = 'post-records'
@ -55,6 +56,28 @@ console = Console(file=sys.stderr)
is_flag=True, is_flag=True,
help='ignore errors when posting a record and continue with remaining records', help='ignore errors when posting a record and continue with remaining records',
) )
@click.option(
'--only-if-modifying',
help='if provided, and if `--curated` is provided, a record will only be '
'posted, if it does not yet exist '
'in the destination, or if the posted record modifies the existing '
'record (to ignore certain record entries, for example, `annotations`, '
'when comparing records, use the option --jsonpath-spec).',
default=False,
is_flag=True,
)
@click.option(
'--jsonpath-spec',
metavar='IGNORE_SPEC',
help='a jsonpath-expression that defines record-attributes, that should be '
'ignored when checking for existing records. Every element that matches '
'the expression will be ignored when comparing records (for a '
'description of the specification syntax check the documentation of '
'https://pypi.org/project/jsonpath-ng/). For example, to ignore '
'`annotations` use `--jsonpath_spec "annotations"`.',
default=None,
is_flag=False,
)
@click.option( @click.option(
'--dry-run', '-d', '--dry-run', '-d',
help='if provided, do not alter any data, instead print what would be done', help='if provided, do not alter any data, instead print what would be done',
@ -69,6 +92,8 @@ def cli(
curated, curated,
author_id, author_id,
ignore_errors, ignore_errors,
only_if_modifying,
jsonpath_spec,
dry_run, dry_run,
): ):
"""Read records of class CLASS from standard input and store them in """Read records of class CLASS from standard input and store them in
@ -104,6 +129,8 @@ def cli(
curated, curated,
author_id, author_id,
ignore_errors, ignore_errors,
only_if_modifying,
jsonpath_spec,
dry_run, dry_run,
) )
) )
@ -117,6 +144,8 @@ def post_records(
curated, curated,
author_id, author_id,
ignore_errors, ignore_errors,
only_if_modifying,
jsonpath_spec,
dry_run, dry_run,
) -> int: ) -> int:
token = obj token = obj
@ -131,6 +160,16 @@ def post_records(
write_record = collection_write_record write_record = collection_write_record
keyword_args = {} keyword_args = {}
record_comparer = None
if only_if_modifying:
if not curated:
console.print('[yellow]Warning[/yellow]: ignoring --only-if-modifying because --curated was not provided')
else:
record_comparer = RecordComparer(jsonpath_spec=jsonpath_spec)
else:
if jsonpath_spec:
console.print('[yellow]Warning[/yellow]: ignoring --jsonpath-spec because --only-if-modifying was not provided')
failed = [] failed = []
session = get_session() session = get_session()
for index, line in zip(count(), track(sys.stdin, console=console)): for index, line in zip(count(), track(sys.stdin, console=console)):
@ -153,6 +192,19 @@ def post_records(
else: else:
class_name = cls class_name = cls
if record_comparer:
existing_record = curated_read_record_with_pid(
service_url=service_url,
collection=collection,
pid=record['pid'],
token=token,
session=session,
)
if existing_record:
if record_comparer.is_equal(existing_record, record):
console.print(f'skipping writing of record [green]{record["pid"]}[/green] because a matching record already exists')
continue
if dry_run: if dry_run:
if curated: if curated:
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to curated area of collection "{collection}" on "{service_url}"') console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to curated area of collection "{collection}" on "{service_url}"')

View file

@ -402,3 +402,101 @@ def test_keep_inboxes(dump_things_service, tmp_path_factory, create_changeset):
] ]
for record in unique_records.values(): for record in unique_records.values():
assert record in cleaned_incoming_records assert record in cleaned_incoming_records
def test_auto_curate_if_changes(dump_things_service, monkeypatch):
from dump_things_pyclient.commands.dtc_plugins.auto_curate import console
port, store = dump_things_service
print_calls = []
monkeypatch.setattr(
console,
'print',
lambda *args: print_calls.extend(args),
)
existing_record = {
"pid": f"test:auto_curate_if_changes",
"given_name": f"markus",
'schema_type': 'test:Person',
'annotations': {
'https://submitter.example.com': 'submitter_1',
'https://counter.example.com': '1',
},
}
new_record = {
k: v for k, v in existing_record.items() if k not in ('annotations',)
}
new_record['annotations'] = {
'https://submitter.example.com': 'submitter_2',
'https://counter.example.com': '2',
}
runner = CliRunner()
# Post the existing record directly into the curated area
result = runner.invoke(
cli,
[
'--token=token-curator',
'post-records',
'--curated',
f'http://127.0.0.1:{port}', 'collection_1', '*',
],
input=json.dumps(existing_record, ensure_ascii=False)
)
assert result.exit_code == 0
print_calls = []
# Post the new record to the users inbox
result = runner.invoke(
cli,
[
'--token=user_1',
'post-records',
f'http://127.0.0.1:{port}', 'collection_1', '*',
],
input=json.dumps(new_record, ensure_ascii=False)
)
assert result.exit_code == 0
print_calls = []
# Try to auto-curate the new record with `--only-if-modifying`, ignoring
# `annotations`. This should not post the record to the curated area, but
# emit a message that the record was not posted
result = runner.invoke(
cli,
[
'--token=token-curator',
'auto-curate',
'--include', 'test_user_1',
'--only-if-modifying',
'--jsonpath-spec', 'annotations',
'--keep-inboxes',
f'http://127.0.0.1:{port}', 'collection_1',
],
input=json.dumps(new_record, ensure_ascii=False)
)
assert result.exit_code == 0
assert 'skipping writing of record [green]test:auto_curate_if_changes[/green] because a matching record already exists' in print_calls
print_calls = []
# Try to post the new record with `--only-if-modifying`, that should post
# the record to curated, because the annotations are different from the
# existing record.
result = runner.invoke(
cli,
[
'--token=token-curator',
'auto-curate',
'--include', 'test_user_1',
'--only-if-modifying',
'--keep-inboxes',
f'http://127.0.0.1:{port}', 'collection_1',
],
input=json.dumps(new_record, ensure_ascii=False)
)
assert result.exit_code == 0
assert 'skipping writing of record [green]test:auto_curate_if_changes[/green] because a matching record already exists' not in print_calls

View file

@ -221,3 +221,81 @@ def test_dtc_post_record_any_class(dump_things_service):
) )
assert all(r in cleaned_disk_record for r in document_records) assert all(r in cleaned_disk_record for r in document_records)
def test_dtc_post_records_if_changes(dump_things_service, monkeypatch):
from dump_things_pyclient.commands.dtc_plugins.post_records import console
port, store = dump_things_service
print_calls = []
monkeypatch.setattr(
console,
'print',
lambda *args: print_calls.extend(args),
)
existing_record = {
"pid": f"test:post_records_if_changes",
"given_name": f"klaus",
'schema_type': 'test:Person',
'annotations': {
'https://submitter.example.com': 'submitter_1',
'https://counter.example.com': '1',
},
}
new_record = {
k: v for k, v in existing_record.items() if k not in ('annotations',)
}
new_record['annotations'] = {
'https://submitter.example.com': 'submitter_2',
'https://counter.example.com': '2',
}
runner = CliRunner()
result = runner.invoke(
cli,
[
'--token=token-curator',
'post-records',
'--curated',
f'http://127.0.0.1:{port}', 'collection_1', '*',
],
input=json.dumps(existing_record, ensure_ascii=False)
)
assert result.exit_code == 0
print_calls = []
# Try to post an identical record with `--only-if-modifying`
result = runner.invoke(
cli,
[
'--token=token-curator',
'post-records',
'--curated',
'--only-if-modifying',
f'http://127.0.0.1:{port}', 'collection_1', '*',
],
input=json.dumps(new_record, ensure_ascii=False)
)
assert result.exit_code == 0
assert 'skipping writing of record [green]test:post_records_if_changes[/green] because a matching record already exists' not in print_calls
print_calls = []
# Try to post the new record with `--only-if-mddifying`, ignoring
# `annotations`.
result = runner.invoke(
cli,
[
'--token=token-curator',
'post-records',
'--curated',
'--only-if-modifying',
'--jsonpath-spec', 'annotations',
f'http://127.0.0.1:{port}', 'collection_1', '*',
],
input=json.dumps(new_record, ensure_ascii=False)
)
assert result.exit_code == 0
assert 'skipping writing of record [green]test:post_records_if_changes[/green] because a matching record already exists' in print_calls

View file

@ -9,6 +9,7 @@ authors = [
] ]
dependencies = [ dependencies = [
"click", "click",
"jsonpath-ng",
"pyyaml", "pyyaml",
"requests", "requests",
"rich-click", "rich-click",

11
uv.lock generated
View file

@ -421,6 +421,7 @@ version = "0.2.16"
source = { virtual = "." } source = { virtual = "." }
dependencies = [ dependencies = [
{ name = "click" }, { name = "click" },
{ name = "jsonpath-ng" },
{ name = "pyyaml" }, { name = "pyyaml" },
{ name = "requests" }, { name = "requests" },
{ name = "rich-click" }, { name = "rich-click" },
@ -449,6 +450,7 @@ requires-dist = [
{ name = "click" }, { name = "click" },
{ name = "dump-things-service", marker = "extra == 'tests'", specifier = ">=5.6.1" }, { name = "dump-things-service", marker = "extra == 'tests'", specifier = ">=5.6.1" },
{ name = "dump-things-service", marker = "extra == 'ttl'", specifier = ">=5.6.1" }, { name = "dump-things-service", marker = "extra == 'ttl'", specifier = ">=5.6.1" },
{ name = "jsonpath-ng" },
{ name = "pytest", marker = "extra == 'tests'", specifier = ">=9.0.1" }, { name = "pytest", marker = "extra == 'tests'", specifier = ">=9.0.1" },
{ name = "pyyaml" }, { name = "pyyaml" },
{ name = "requests" }, { name = "requests" },
@ -1058,6 +1060,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e5/90/0d93963711f811efe528e3cead2f2bfb78c196df74d8a24fe8d655288e50/jsonasobj2-1.0.4-py3-none-any.whl", hash = "sha256:12e86f86324d54fcf60632db94ea74488d5314e3da554c994fe1e2c6f29acb79", size = 6324, upload-time = "2021-06-02T17:43:27.126Z" }, { url = "https://files.pythonhosted.org/packages/e5/90/0d93963711f811efe528e3cead2f2bfb78c196df74d8a24fe8d655288e50/jsonasobj2-1.0.4-py3-none-any.whl", hash = "sha256:12e86f86324d54fcf60632db94ea74488d5314e3da554c994fe1e2c6f29acb79", size = 6324, upload-time = "2021-06-02T17:43:27.126Z" },
] ]
[[package]]
name = "jsonpath-ng"
version = "1.8.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/32/58/250751940d75c8019659e15482d548a4aa3b6ce122c515102a4bfdac50e3/jsonpath_ng-1.8.0.tar.gz", hash = "sha256:54252968134b5e549ea5b872f1df1168bd7defe1a52fed5a358c194e1943ddc3", size = 74513, upload-time = "2026-02-24T14:42:06.182Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/03/99/33c7d78a3fb70d545fd5411ac67a651c81602cc09c9cf0df383733f068c5/jsonpath_ng-1.8.0-py3-none-any.whl", hash = "sha256:b8dde192f8af58d646fc031fac9c99fe4d00326afc4148f1f043c601a8cfe138", size = 67844, upload-time = "2026-02-28T00:53:19.637Z" },
]
[[package]] [[package]]
name = "jsonpointer" name = "jsonpointer"
version = "3.0.0" version = "3.0.0"