dump-things-pyclient/dump_things_pyclient/commands/dtc_plugins/auto_curate.py
Christian Monch 23714d4f8f
All checks were successful
Test execution / Test-all (push) Successful in 37s
add tests for --only-if-modifying in auto-curate
2026-03-31 17:05:16 +02:00

814 lines
29 KiB
Python

import json
import logging
import re
import subprocess
import sys
from itertools import chain
from pathlib import Path
from typing import (
cast,
Iterable,
)
import rich_click as click
from requests import Session
from rich.console import Console
from rich.progress import track
from ...communicate import (
HTTPError,
curated_read_record_with_pid,
curated_write_record,
get_session,
incoming_delete_record,
incoming_read_labels,
incoming_read_records,
)
from .common.record_comparer import RecordComparer
subcommand_name = 'auto-curate'
logger = logging.getLogger('auto-curate')
console = Console(file=sys.stderr)
stl_info = False
@click.command(short_help='Move records from inbox to curate area of a collection')
@click.pass_obj
@click.argument(
'service_url',
metavar='SERVICE_URL',
)
@click.argument(
'collection',
metavar='COLLECTION',
)
@click.option(
'--destination-service-url',
metavar='DEST_SERVICE_URL',
help='select a different dump-thing-service, i.e. not SERVICE_URL, as destination for auto-curated records (the default is SERVICE_URL)',
)
@click.option(
'--destination-collection',
metavar='DEST_COLLECTION',
help='select a different collection, i.e. not COLLECTION, as destination for auto-curated records',
)
@click.option(
'--destination-token',
metavar='DEST_TOKEN',
help='if provided, this token will be used the authenticate against DEST_SERVICE_URL, which defaults to SERVICE_URL (the default is the token provided via --token)',
)
@click.option(
'--create-change-set',
metavar='CHANGE_SET_DIR',
help='move inbox content into a change set in the specified directory instead of modifying data on a server. A change set contains all changes that would be performed in the destination collection (as git-repo with modifications). '
'`annotations` are not included in the modifications, but are stored separately in the change set directory. If the directory does not exist, it will be created. Specifying an existing directory that is not empty is an error. '
'NOTE: the records will be moved into the change set, i.e., they will be removed from the inboxes, unless `--keep-inbox` is specified.',
type=click.Path(
exists=False,
file_okay=False,
dir_okay=True,
writable=True,
allow_dash=False,
path_type=Path,
),
)
@click.option(
'--post-change-set',
metavar='CHANGE_SET_DIR',
help='read the change set at CHANGE_SET_DIR and post it to the curated area of the given destination server and collection. '
'If `--add-annotations` is provided in addition, the annotations that were recorded in the records in the incoming areas '
'are added to the respective records before posting them to the destination (--post-change-set and '
'--create-change-set are mutually exclusive). If `--only-if-modifying` is provided, a record from the change '
'will only be posted, if an "identical" record is not already in the destination. Here "identical" means that the '
'records are identical if the attributed defined in `--jsonpath-spec` are ignored.',
type=click.Path(
exists=True,
file_okay=False,
dir_okay=True,
writable=True,
allow_dash=False,
path_type=Path,
),
)
@click.option(
'--keep-inboxes',
help='do not remove records from inbox when auto-curating them. By default `auto-curate` (with and without `--create-change-set`) '
'removes records from the processed inboxes. This flag prevents this removal.',
default=False,
is_flag=True,
)
@click.option(
'--add-annotations',
help='add annotations when posting a change set via `--post-change-set`. By default the annotations will not be added to the records that are posted. If --post-change-set is not provided, this flag is ignored',
default=False,
is_flag=True,
)
@click.option(
'--author_id',
metavar='AUTHOR_ID',
default=None,
help='specify an author ID that will be stored in the audit log (AUTHOR_ID must not contain whitespace).',
)
@click.option(
'--pid', '-p',
metavar='PID',
help='if provided, process only records that match the given PIDs. NOTE: matching does not involve CURIE-resolution',
)
@click.option(
'--exclude', '-e',
metavar='INBOX_LABEL',
help='exclude an inbox on the source collection (repeatable)',
multiple=True,
)
@click.option(
'--include', '-i',
metavar='INBOX_LABEL',
help='process only the given inbox, all other inboxes are ignored (repeatable, -e/--exclude is applied after inclusion)',
multiple=True,
)
@click.option(
'--list-labels', '-l',
help='list the inbox labels of the given source collection, do not perform any curation',
default=False,
is_flag=True,
)
@click.option(
'--list-records', '-r',
help='list records in the inboxes of the given source collection, do not perform any curation',
default=False,
is_flag=True,
)
@click.option(
'--only-if-modifying',
help='if provided a record will only be posted, if it does not yet exist '
'in the destination, or if the posted record modifies the existing '
'record (to ignore certain record entries, for example, `annotations`, '
'when comparing records, use the option --jsonpath-spec). NOTE: even if '
'a record is not posted, it will be removed from its inbox (unless '
'`--keep-inboxes` is provided).',
default=False,
is_flag=True,
)
@click.option(
'--jsonpath-spec',
metavar='IGNORE_SPEC',
help='a jsonpath-expression that defines record-attributes, that should be '
'ignored when checking for existing records. Every element that matches '
'the expression will be ignored when comparing records (for a '
'description of the specification syntax check the documentation of '
'https://pypi.org/project/jsonpath-ng/). For example, to ignore '
'`annotations` use `--jsonpath_spec "annotations"`.',
default=None,
is_flag=False,
)
@click.option(
'--dry-run', '-d',
help='if provided, do not alter any data, instead print what would be done',
default=False,
is_flag=True,
)
def cli(
obj,
service_url,
collection,
destination_service_url,
destination_collection,
destination_token,
create_change_set,
post_change_set,
keep_inboxes,
add_annotations,
author_id,
pid,
exclude,
include,
list_labels,
list_records,
only_if_modifying,
jsonpath_spec,
dry_run,
):
"""Automatically move records from the incoming areas of the collection
COLLECTION in the service SERVICE_URL to the curated area of the same
collection, or to the curated area of another collection, possibly on
another service.
`auto-curate` can create change sets that reflect what changes would be
applied to the destination, if records in inboxes were posted to the
destination. The changes are reflected as worktree diff in a git
repository (annotations are excluded from the diff).
`auto-curate` can also read records from a change set and post them
a service. If required, annotations can be re-added before the records
from the change set are posted to the service.
A token is required and will be used to authenticate the requests.
The token must have curator-rights."""
try:
sys.exit(
auto_curate(
obj,
service_url,
collection,
destination_service_url,
destination_collection,
destination_token,
create_change_set,
post_change_set,
keep_inboxes,
add_annotations,
author_id,
pid,
exclude,
include,
list_labels,
list_records,
only_if_modifying,
jsonpath_spec,
dry_run,
)
)
except HTTPError as e:
console.print(
f'[red]Error[/red]: {e}: {e.response.text}',
)
sys.exit(1)
def auto_curate(
obj,
service_url,
collection,
destination_service_url,
destination_collection,
destination_token,
create_change_set,
post_change_set,
keep_inboxes,
add_annotations,
author_id,
pid,
exclude,
include,
list_labels,
list_records,
only_if_modifying,
jsonpath_spec,
dry_run,
):
curator_token = obj
if curator_token is None:
console.print(f'[red]Error[/red]: no token was provided (use --token or DTC_TOKEN environment variable)')
return 1
if create_change_set:
if post_change_set:
console.print(f'[red]Error[/red]: --create-change-set and --post-change-set must not be specified together')
return 1
if list_labels or list_records:
console.print(f'[red]Error[/red]: --create-change-set cannot be combined with --list-labels or --list-records')
return 1
if create_change_set.exists():
if tuple(create_change_set.glob('*')) != tuple():
console.print(f'[red]Error[/red]: {create_change_set} already exists and is not empty')
return 1
else:
if dry_run:
console.print(f'[DRY_RUN]:CREATE DIRECORY: {create_change_set}')
else:
create_change_set.mkdir()
if dry_run:
console.print(f'[DRY_RUN]:INITIALIZING GIT REPO: {create_change_set}')
else:
subprocess.run(
['git', 'init', str(create_change_set)],
capture_output=True,
check=True,
)
if post_change_set:
if list_labels or list_records:
console.print(f'[red]Error[/red]: --post-change-set cannot be combined with --list-labels or --list-records')
return 1
if destination_collection is None:
destination_collection = collection
if destination_service_url is None:
destination_service_url = service_url
if destination_token is None:
destination_token = curator_token
session = get_session()
output = None
# If --list-labels and --list-records are provided, keep only the latter,
# because it includes listing of labels
if list_records:
if list_labels:
console.print(f'[yellow]Warning[/yellow]: `-l/--list-labels` and `-r/--list-records` defined, ignoring `-l/--list-labels`')
list_labels = False
output = {}
if list_labels:
output = []
# Get the labels from the service, or from a change set that should be posted.
if post_change_set:
all_labels = _get_change_set_labels(post_change_set)
else:
all_labels = incoming_read_labels(
service_url=service_url,
collection=collection,
token=obj,
session=session,
)
for label in all_labels:
if include and label not in include:
logger.debug('ignoring non-included incoming label: %s', label)
continue
if label in exclude:
logger.debug('ignoring excluded incoming label: %s', label)
continue
if list_labels:
output.append(label)
continue
# Get the records from the service, or from a change set that should
# be posted.
if post_change_set:
record_source = _get_change_set_records(post_change_set, label)
final_record_source = track(
record_source,
description=f'processing [green]{label}[/green]',
total=len(record_source),
console=console,
)
else:
record_source = incoming_read_records(
service_url=service_url,
collection=collection,
label=label,
token=obj,
session=session,
)
# Get the first entry to find the total number of records in the
# incoming area `label`.
try:
first_record, _, _, _, total = next(record_source)
except StopIteration:
console.print(f'no records in incoming area [green]{label}[/green], skipping it')
continue
# Create a source that returns the first entry, all remaining entries,
# and tracks progress.
final_record_source = track(
chain(
[(first_record, None, None, None, None)],
cast(Iterable, record_source),
),
description=f'processing [green]{label}[/green]',
total=total,
console=console,
)
if create_change_set:
result = _update_change_set(
source=final_record_source,
change_set=create_change_set,
label=label,
destination_service_url=destination_service_url,
destination_collection=destination_collection,
destination_token=destination_token,
pid=pid,
keep_inboxes=keep_inboxes,
dry_run=dry_run,
session=session,
)
if result != 0:
return result
elif list_records:
output[label] = [record for record, _, _, _, _, in final_record_source]
continue
elif post_change_set:
result = _post_change_set(
final_record_source,
post_change_set=post_change_set,
label=label,
destination_service_url=destination_service_url,
destination_collection=destination_collection,
destination_token=destination_token,
author_id=author_id,
pid=pid,
add_annotations=add_annotations,
only_if_modifying=only_if_modifying,
jsonpath_spec=jsonpath_spec,
dry_run=dry_run,
session=session,
)
if result != 0:
return result
else:
result = _curate_records(
final_record_source,
service_url=service_url,
collection=collection,
label=label,
curator_token=curator_token,
destination_service_url=destination_service_url,
destination_collection=destination_collection,
destination_token=destination_token,
author_id=author_id,
pid=pid,
keep_inboxes=keep_inboxes,
only_if_modifying=only_if_modifying,
jsonpath_spec=jsonpath_spec,
dry_run=dry_run,
session=session,
)
if result != 0:
return result
if create_change_set:
click.echo(str(create_change_set))
elif output is not None:
click.echo(json.dumps(output, ensure_ascii=False))
return 0
def _curate_records(
source: Iterable,
service_url: str,
collection: str,
label: str,
curator_token: str,
destination_service_url: str,
destination_collection: str,
destination_token: str,
author_id: str | None,
pid: str | None,
keep_inboxes: bool,
only_if_modifying: bool,
jsonpath_spec: str | None,
dry_run: bool,
session: Session,
) -> int:
record_comparer = None
if only_if_modifying:
record_comparer = RecordComparer(jsonpath_spec=jsonpath_spec)
else:
if jsonpath_spec:
console.print('[yellow]Warning[/yellow]: ignoring --jsonpath-spec because --only-if-modifying was not provided')
for record, _, _, _, _ in source:
if pid:
if record['pid'] not in pid:
logger.debug(
'ignoring record with non-matching pid: %s',
record['pid'])
continue
class_name = _get_class_name(record)
if class_name is None:
console.print(f'[yellow]Warning[/yellow]: could not determine class in record [yellow]{record["pid"]}[/yellow], ignoring it.')
continue
if record_comparer:
existing_record = curated_read_record_with_pid(
service_url=service_url,
collection=collection,
pid=record['pid'],
token=curator_token,
session=session,
)
if existing_record:
if record_comparer.is_equal(existing_record, record):
console.print(f'skipping writing of record [green]{record["pid"]}[/green] because a matching record already exists')
continue
if dry_run:
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to collection "{destination_collection}" on "{destination_service_url}"')
else:
# Store record in destination collection
try:
curated_write_record(
service_url=destination_service_url,
collection=destination_collection,
class_name=class_name,
record=record,
author_id=author_id,
token=destination_token,
session=session,
)
except HTTPError as e:
console.print(
f'[red]Error[/red]: writing record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
)
return 1
if not keep_inboxes:
if dry_run:
console.print(f'[DRY_RUN]:DELETE record [green]"{record["pid"]}"[/green] from inbox "{label}" of collection "{collection}" on "{service_url}"')
else:
# Delete record from incoming area
try:
incoming_delete_record(
service_url=service_url,
collection=collection,
label=label,
pid=record['pid'],
token=curator_token,
session=session,
)
except HTTPError as e:
console.print(
f'[red]Error[/red]: deleting record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
)
return 1
return 0
def _get_class_name(record: dict) -> str | None:
"""Get the class name from the `schema_type` attribute.
This requires that the schema type is either stored in the record or that
the store has a "Schema Type Layer", i.e., the store type is
`record_dir+stl`, or `sqlite+stl`.
"""
try:
return re.search('([_A-Za-z0-9]*$)', record['schema_type']).group(0)
except (IndexError, KeyError):
global stl_info
console.print(f'[yellow]Warning[/yellow]: ignoring record with pid [yellow]{record["pid"]}[/yellow] because `schema_type` attribute is missing.')
if not stl_info:
console.print(
' [yellow]Please ensure that `schema_type` is stored in the records. Note: '
'if the incoming area store has a backend with a "Schema Type Layer", i.e., '
'"record_dir+stl" or "sqlite+stl", `schema_type` will not be stored on persistent '
'storage and will not be returned when retrieving records from the incoming area.'
'dump-things-service circumvents the "Schema Type Layer" on curator access, i.e., '
'on access to incoming areas and on access to the curated area. Therefore it '
'might return records without `schema_type` attributes on curator access to '
'incoming areas or curated areas. To be consistent, it might be a good idea to '
'NOT use a "Schema Type Layer" in collections that shall be auto-curated, but '
'store `schema_type`-attributes in all records.[/yellow]',
)
stl_info = True
return None
def _update_change_set(
source: Iterable,
change_set: Path,
label: str,
destination_service_url: str,
destination_collection: str,
destination_token: str,
pid: str | None,
keep_inboxes: bool,
dry_run: bool,
session: Session,
) -> int:
label_dir = change_set / 'records' / label
annotation_dir = change_set / 'annotations' / label
if dry_run:
console.print(f'[DRY_RUN]:CREATING LABEL DIRECTORY: {label_dir}')
else:
label_dir.mkdir(parents=True, exist_ok=True)
if dry_run:
console.print(f'[DRY_RUN]:CREATING ANNOTATION DIRECTORY: {annotation_dir}')
else:
annotation_dir.mkdir(parents=True, exist_ok=True)
for record, _, _, _, _ in source:
if pid:
if record['pid'] not in pid:
logger.debug(
'ignoring record with non-matching pid: %s',
record['pid'])
continue
class_name = _get_class_name(record)
if class_name is None:
continue
file_path = label_dir / _pid_to_filename(record['pid'])
annotation_path = annotation_dir / _pid_to_filename(record['pid'])
# Fetch the current curated record from the destination, delete its
# annotations, write it to a file, and add the file to git.
existing_record = curated_read_record_with_pid(
service_url=destination_service_url,
collection=destination_collection,
pid=record['pid'],
token=destination_token,
session=session,
)
if existing_record:
existing_record.pop('annotations', None)
if dry_run:
console.print(f'[DRY_RUN]:STORE existing record [green]"{record["pid"]}"[/green] of class "{class_name}" from curated area at "{file_path.resolve()}"')
else:
file_path.write_text(
json.dumps(existing_record, ensure_ascii=False, indent=2) + '\n',
encoding='utf8'
)
else:
if dry_run:
console.print(f'[DRY_RUN]:STORE "null" at "{file_path.resolve()}"')
else:
file_path.write_text('null\n')
intra_repo_file_path = file_path.relative_to(change_set)
if dry_run:
console.print(f'[DRY_RUN]:GIT ADD {intra_repo_file_path} (cdw={change_set})')
else:
subprocess.run(
['git', 'add', str(intra_repo_file_path)],
capture_output=True,
cwd=change_set,
check=True,
)
# Write the new record without annotations to disk
annotations = record.pop('annotations', None)
if dry_run:
console.print(f'[DRY_RUN]:STORE new record [green]"{record["pid"]}"[/green] of class "{class_name}" from inbox {label} at "{file_path.resolve()}"')
else:
file_path.write_text(
json.dumps(record, ensure_ascii=False, indent=2) + '\n',
encoding='utf8'
)
if dry_run:
console.print(f'[DRY_RUN]:STORE annotations for record [green]"{record["pid"]}"[/green] of class "{class_name}" from curated area at "{annotation_path.resolve()}"')
else:
annotation_path.write_text(
json.dumps(annotations, ensure_ascii=False) + '\n',
encoding='utf8'
)
if not keep_inboxes:
if dry_run:
console.print(f'[DRY_RUN]:DELETING record with pid [green]{record["pid"]}[/green]')
else:
# Delete record from incoming area
try:
incoming_delete_record(
service_url=destination_service_url,
collection=destination_collection,
label=label,
pid=record['pid'],
token=destination_token,
session=session,
)
except HTTPError as e:
console.print(
f'[red]Error[/red]: deleting record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
)
return 1
if dry_run:
console.print(f'[DRY_RUN]:GIT COMMIT in {change_set}')
else:
subprocess.run(
['git', 'commit', '-m', 'commit curated state'],
capture_output=True,
cwd=change_set,
check=True,
)
return 0
def _pid_to_filename(pid: str) -> str:
return pid.replace('-', '--').replace('/', '-_')
def _post_change_set(
source: Iterable,
post_change_set: Path,
label: str,
destination_service_url: str,
destination_collection: str,
destination_token: str,
author_id: str | None,
pid: str | None,
add_annotations: bool,
only_if_modifying: bool,
jsonpath_spec: str | None,
dry_run: bool,
session: Session,
):
pid_file_name = _pid_to_filename(pid) if pid else None
record_comparer = None
if only_if_modifying:
record_comparer = RecordComparer(jsonpath_spec=jsonpath_spec)
else:
if jsonpath_spec:
console.print('[yellow]Warning[/yellow]: ignoring --jsonpath-spec because --only-if-modifying was not provided')
for file_name in source:
if pid_file_name and pid_file_name != file_name:
logger.debug('ignoring file with non-matching pid: %s', pid_file_name)
continue
try:
record_file = post_change_set / 'records' / label / file_name
record = json.loads(record_file.read_text())
except json.JSONDecodeError as jde:
console.print(f'[yellow]Warning[/yellow]: JSON error: {jde} at [yellow]{record_file}[/yellow], ignoring it.')
continue
if add_annotations:
annotation_file = post_change_set / 'annotations' / label / file_name
try:
annotations = json.loads(annotation_file.read_text())
except json.JSONDecodeError as jde:
console.print(f'[yellow]Warning[/yellow]: JSON error: {jde} at [yellow]{annotation_file}[/yellow], ignoring record.')
continue
record['annotations'] = annotations
class_name = _get_class_name(record)
if class_name is None:
console.print(f'[yellow]Warning[/yellow]: could not determine class of record [yellow]{record["pid"]}[/yellow], ignoring it.')
continue
if record_comparer:
existing_record = curated_read_record_with_pid(
service_url=destination_service_url,
collection=destination_collection,
pid=record['pid'],
token=destination_token,
session=session,
)
if existing_record:
if record_comparer.is_equal(existing_record, record):
console.print(f'skipping writing of record [green]{record["pid"]}[/green] because a matching record already exists')
continue
if dry_run:
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to collection "{destination_collection}" on "{destination_service_url}"')
continue
# Store record in destination collection
try:
curated_write_record(
service_url=destination_service_url,
collection=destination_collection,
class_name=class_name,
record=record,
author_id=author_id,
token=destination_token,
session=session,
)
except HTTPError as e:
console.print(
f'[red]Error[/red]: writing record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
)
return 1
return 0
def _get_change_set_labels(
change_set: Path,
) -> tuple[str]:
return tuple(
map(
lambda match: match.name,
change_set.glob(f'records/*')
)
)
def _get_change_set_records(
change_set: Path,
label: str,
) -> tuple[str]:
return tuple(
map(
lambda match: match.name,
change_set.glob(f'records/{label}/*')
)
)