814 lines
29 KiB
Python
814 lines
29 KiB
Python
import json
|
|
import logging
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from itertools import chain
|
|
from pathlib import Path
|
|
from typing import (
|
|
cast,
|
|
Iterable,
|
|
)
|
|
|
|
import rich_click as click
|
|
from requests import Session
|
|
from rich.console import Console
|
|
from rich.progress import track
|
|
|
|
from ...communicate import (
|
|
HTTPError,
|
|
curated_read_record_with_pid,
|
|
curated_write_record,
|
|
get_session,
|
|
incoming_delete_record,
|
|
incoming_read_labels,
|
|
incoming_read_records,
|
|
)
|
|
from .common.record_comparer import RecordComparer
|
|
|
|
|
|
subcommand_name = 'auto-curate'
|
|
|
|
logger = logging.getLogger('auto-curate')
|
|
|
|
console = Console(file=sys.stderr)
|
|
stl_info = False
|
|
|
|
|
|
@click.command(short_help='Move records from inbox to curate area of a collection')
|
|
@click.pass_obj
|
|
@click.argument(
|
|
'service_url',
|
|
metavar='SERVICE_URL',
|
|
)
|
|
@click.argument(
|
|
'collection',
|
|
metavar='COLLECTION',
|
|
)
|
|
@click.option(
|
|
'--destination-service-url',
|
|
metavar='DEST_SERVICE_URL',
|
|
help='select a different dump-thing-service, i.e. not SERVICE_URL, as destination for auto-curated records (the default is SERVICE_URL)',
|
|
)
|
|
@click.option(
|
|
'--destination-collection',
|
|
metavar='DEST_COLLECTION',
|
|
help='select a different collection, i.e. not COLLECTION, as destination for auto-curated records',
|
|
)
|
|
@click.option(
|
|
'--destination-token',
|
|
metavar='DEST_TOKEN',
|
|
help='if provided, this token will be used the authenticate against DEST_SERVICE_URL, which defaults to SERVICE_URL (the default is the token provided via --token)',
|
|
)
|
|
@click.option(
|
|
'--create-change-set',
|
|
metavar='CHANGE_SET_DIR',
|
|
help='move inbox content into a change set in the specified directory instead of modifying data on a server. A change set contains all changes that would be performed in the destination collection (as git-repo with modifications). '
|
|
'`annotations` are not included in the modifications, but are stored separately in the change set directory. If the directory does not exist, it will be created. Specifying an existing directory that is not empty is an error. '
|
|
'NOTE: the records will be moved into the change set, i.e., they will be removed from the inboxes, unless `--keep-inbox` is specified.',
|
|
type=click.Path(
|
|
exists=False,
|
|
file_okay=False,
|
|
dir_okay=True,
|
|
writable=True,
|
|
allow_dash=False,
|
|
path_type=Path,
|
|
),
|
|
)
|
|
@click.option(
|
|
'--post-change-set',
|
|
metavar='CHANGE_SET_DIR',
|
|
help='read the change set at CHANGE_SET_DIR and post it to the curated area of the given destination server and collection. '
|
|
'If `--add-annotations` is provided in addition, the annotations that were recorded in the records in the incoming areas '
|
|
'are added to the respective records before posting them to the destination (--post-change-set and '
|
|
'--create-change-set are mutually exclusive). If `--only-if-modifying` is provided, a record from the change '
|
|
'will only be posted, if an "identical" record is not already in the destination. Here "identical" means that the '
|
|
'records are identical if the attributed defined in `--jsonpath-spec` are ignored.',
|
|
type=click.Path(
|
|
exists=True,
|
|
file_okay=False,
|
|
dir_okay=True,
|
|
writable=True,
|
|
allow_dash=False,
|
|
path_type=Path,
|
|
),
|
|
)
|
|
@click.option(
|
|
'--keep-inboxes',
|
|
help='do not remove records from inbox when auto-curating them. By default `auto-curate` (with and without `--create-change-set`) '
|
|
'removes records from the processed inboxes. This flag prevents this removal.',
|
|
default=False,
|
|
is_flag=True,
|
|
)
|
|
@click.option(
|
|
'--add-annotations',
|
|
help='add annotations when posting a change set via `--post-change-set`. By default the annotations will not be added to the records that are posted. If --post-change-set is not provided, this flag is ignored',
|
|
default=False,
|
|
is_flag=True,
|
|
)
|
|
@click.option(
|
|
'--author_id',
|
|
metavar='AUTHOR_ID',
|
|
default=None,
|
|
help='specify an author ID that will be stored in the audit log (AUTHOR_ID must not contain whitespace).',
|
|
)
|
|
@click.option(
|
|
'--pid', '-p',
|
|
metavar='PID',
|
|
help='if provided, process only records that match the given PIDs. NOTE: matching does not involve CURIE-resolution',
|
|
)
|
|
@click.option(
|
|
'--exclude', '-e',
|
|
metavar='INBOX_LABEL',
|
|
help='exclude an inbox on the source collection (repeatable)',
|
|
multiple=True,
|
|
)
|
|
@click.option(
|
|
'--include', '-i',
|
|
metavar='INBOX_LABEL',
|
|
help='process only the given inbox, all other inboxes are ignored (repeatable, -e/--exclude is applied after inclusion)',
|
|
multiple=True,
|
|
)
|
|
@click.option(
|
|
'--list-labels', '-l',
|
|
help='list the inbox labels of the given source collection, do not perform any curation',
|
|
default=False,
|
|
is_flag=True,
|
|
)
|
|
@click.option(
|
|
'--list-records', '-r',
|
|
help='list records in the inboxes of the given source collection, do not perform any curation',
|
|
default=False,
|
|
is_flag=True,
|
|
)
|
|
@click.option(
|
|
'--only-if-modifying',
|
|
help='if provided a record will only be posted, if it does not yet exist '
|
|
'in the destination, or if the posted record modifies the existing '
|
|
'record (to ignore certain record entries, for example, `annotations`, '
|
|
'when comparing records, use the option --jsonpath-spec). NOTE: even if '
|
|
'a record is not posted, it will be removed from its inbox (unless '
|
|
'`--keep-inboxes` is provided).',
|
|
default=False,
|
|
is_flag=True,
|
|
)
|
|
@click.option(
|
|
'--jsonpath-spec',
|
|
metavar='IGNORE_SPEC',
|
|
help='a jsonpath-expression that defines record-attributes, that should be '
|
|
'ignored when checking for existing records. Every element that matches '
|
|
'the expression will be ignored when comparing records (for a '
|
|
'description of the specification syntax check the documentation of '
|
|
'https://pypi.org/project/jsonpath-ng/). For example, to ignore '
|
|
'`annotations` use `--jsonpath_spec "annotations"`.',
|
|
default=None,
|
|
is_flag=False,
|
|
)
|
|
@click.option(
|
|
'--dry-run', '-d',
|
|
help='if provided, do not alter any data, instead print what would be done',
|
|
default=False,
|
|
is_flag=True,
|
|
)
|
|
def cli(
|
|
obj,
|
|
service_url,
|
|
collection,
|
|
destination_service_url,
|
|
destination_collection,
|
|
destination_token,
|
|
create_change_set,
|
|
post_change_set,
|
|
keep_inboxes,
|
|
add_annotations,
|
|
author_id,
|
|
pid,
|
|
exclude,
|
|
include,
|
|
list_labels,
|
|
list_records,
|
|
only_if_modifying,
|
|
jsonpath_spec,
|
|
dry_run,
|
|
):
|
|
"""Automatically move records from the incoming areas of the collection
|
|
COLLECTION in the service SERVICE_URL to the curated area of the same
|
|
collection, or to the curated area of another collection, possibly on
|
|
another service.
|
|
|
|
`auto-curate` can create change sets that reflect what changes would be
|
|
applied to the destination, if records in inboxes were posted to the
|
|
destination. The changes are reflected as worktree diff in a git
|
|
repository (annotations are excluded from the diff).
|
|
`auto-curate` can also read records from a change set and post them
|
|
a service. If required, annotations can be re-added before the records
|
|
from the change set are posted to the service.
|
|
|
|
A token is required and will be used to authenticate the requests.
|
|
The token must have curator-rights."""
|
|
try:
|
|
sys.exit(
|
|
auto_curate(
|
|
obj,
|
|
service_url,
|
|
collection,
|
|
destination_service_url,
|
|
destination_collection,
|
|
destination_token,
|
|
create_change_set,
|
|
post_change_set,
|
|
keep_inboxes,
|
|
add_annotations,
|
|
author_id,
|
|
pid,
|
|
exclude,
|
|
include,
|
|
list_labels,
|
|
list_records,
|
|
only_if_modifying,
|
|
jsonpath_spec,
|
|
dry_run,
|
|
)
|
|
)
|
|
except HTTPError as e:
|
|
console.print(
|
|
f'[red]Error[/red]: {e}: {e.response.text}',
|
|
)
|
|
sys.exit(1)
|
|
|
|
|
|
def auto_curate(
|
|
obj,
|
|
service_url,
|
|
collection,
|
|
destination_service_url,
|
|
destination_collection,
|
|
destination_token,
|
|
create_change_set,
|
|
post_change_set,
|
|
keep_inboxes,
|
|
add_annotations,
|
|
author_id,
|
|
pid,
|
|
exclude,
|
|
include,
|
|
list_labels,
|
|
list_records,
|
|
only_if_modifying,
|
|
jsonpath_spec,
|
|
dry_run,
|
|
):
|
|
curator_token = obj
|
|
|
|
if curator_token is None:
|
|
console.print(f'[red]Error[/red]: no token was provided (use --token or DTC_TOKEN environment variable)')
|
|
return 1
|
|
|
|
if create_change_set:
|
|
if post_change_set:
|
|
console.print(f'[red]Error[/red]: --create-change-set and --post-change-set must not be specified together')
|
|
return 1
|
|
|
|
if list_labels or list_records:
|
|
console.print(f'[red]Error[/red]: --create-change-set cannot be combined with --list-labels or --list-records')
|
|
return 1
|
|
|
|
if create_change_set.exists():
|
|
if tuple(create_change_set.glob('*')) != tuple():
|
|
console.print(f'[red]Error[/red]: {create_change_set} already exists and is not empty')
|
|
return 1
|
|
else:
|
|
if dry_run:
|
|
console.print(f'[DRY_RUN]:CREATE DIRECORY: {create_change_set}')
|
|
else:
|
|
create_change_set.mkdir()
|
|
|
|
if dry_run:
|
|
console.print(f'[DRY_RUN]:INITIALIZING GIT REPO: {create_change_set}')
|
|
else:
|
|
subprocess.run(
|
|
['git', 'init', str(create_change_set)],
|
|
capture_output=True,
|
|
check=True,
|
|
)
|
|
|
|
if post_change_set:
|
|
if list_labels or list_records:
|
|
console.print(f'[red]Error[/red]: --post-change-set cannot be combined with --list-labels or --list-records')
|
|
return 1
|
|
|
|
if destination_collection is None:
|
|
destination_collection = collection
|
|
|
|
if destination_service_url is None:
|
|
destination_service_url = service_url
|
|
|
|
if destination_token is None:
|
|
destination_token = curator_token
|
|
|
|
session = get_session()
|
|
|
|
output = None
|
|
|
|
# If --list-labels and --list-records are provided, keep only the latter,
|
|
# because it includes listing of labels
|
|
if list_records:
|
|
if list_labels:
|
|
console.print(f'[yellow]Warning[/yellow]: `-l/--list-labels` and `-r/--list-records` defined, ignoring `-l/--list-labels`')
|
|
list_labels = False
|
|
output = {}
|
|
|
|
if list_labels:
|
|
output = []
|
|
|
|
# Get the labels from the service, or from a change set that should be posted.
|
|
if post_change_set:
|
|
all_labels = _get_change_set_labels(post_change_set)
|
|
else:
|
|
all_labels = incoming_read_labels(
|
|
service_url=service_url,
|
|
collection=collection,
|
|
token=obj,
|
|
session=session,
|
|
)
|
|
|
|
for label in all_labels:
|
|
if include and label not in include:
|
|
logger.debug('ignoring non-included incoming label: %s', label)
|
|
continue
|
|
|
|
if label in exclude:
|
|
logger.debug('ignoring excluded incoming label: %s', label)
|
|
continue
|
|
|
|
if list_labels:
|
|
output.append(label)
|
|
continue
|
|
|
|
# Get the records from the service, or from a change set that should
|
|
# be posted.
|
|
if post_change_set:
|
|
record_source = _get_change_set_records(post_change_set, label)
|
|
final_record_source = track(
|
|
record_source,
|
|
description=f'processing [green]{label}[/green]',
|
|
total=len(record_source),
|
|
console=console,
|
|
)
|
|
|
|
else:
|
|
record_source = incoming_read_records(
|
|
service_url=service_url,
|
|
collection=collection,
|
|
label=label,
|
|
token=obj,
|
|
session=session,
|
|
)
|
|
|
|
# Get the first entry to find the total number of records in the
|
|
# incoming area `label`.
|
|
try:
|
|
first_record, _, _, _, total = next(record_source)
|
|
except StopIteration:
|
|
console.print(f'no records in incoming area [green]{label}[/green], skipping it')
|
|
continue
|
|
|
|
# Create a source that returns the first entry, all remaining entries,
|
|
# and tracks progress.
|
|
final_record_source = track(
|
|
chain(
|
|
[(first_record, None, None, None, None)],
|
|
cast(Iterable, record_source),
|
|
),
|
|
description=f'processing [green]{label}[/green]',
|
|
total=total,
|
|
console=console,
|
|
)
|
|
|
|
if create_change_set:
|
|
result = _update_change_set(
|
|
source=final_record_source,
|
|
change_set=create_change_set,
|
|
label=label,
|
|
destination_service_url=destination_service_url,
|
|
destination_collection=destination_collection,
|
|
destination_token=destination_token,
|
|
pid=pid,
|
|
keep_inboxes=keep_inboxes,
|
|
dry_run=dry_run,
|
|
session=session,
|
|
)
|
|
if result != 0:
|
|
return result
|
|
|
|
elif list_records:
|
|
output[label] = [record for record, _, _, _, _, in final_record_source]
|
|
continue
|
|
|
|
elif post_change_set:
|
|
result = _post_change_set(
|
|
final_record_source,
|
|
post_change_set=post_change_set,
|
|
label=label,
|
|
destination_service_url=destination_service_url,
|
|
destination_collection=destination_collection,
|
|
destination_token=destination_token,
|
|
author_id=author_id,
|
|
pid=pid,
|
|
add_annotations=add_annotations,
|
|
only_if_modifying=only_if_modifying,
|
|
jsonpath_spec=jsonpath_spec,
|
|
dry_run=dry_run,
|
|
session=session,
|
|
)
|
|
if result != 0:
|
|
return result
|
|
|
|
else:
|
|
result = _curate_records(
|
|
final_record_source,
|
|
service_url=service_url,
|
|
collection=collection,
|
|
label=label,
|
|
curator_token=curator_token,
|
|
destination_service_url=destination_service_url,
|
|
destination_collection=destination_collection,
|
|
destination_token=destination_token,
|
|
author_id=author_id,
|
|
pid=pid,
|
|
keep_inboxes=keep_inboxes,
|
|
only_if_modifying=only_if_modifying,
|
|
jsonpath_spec=jsonpath_spec,
|
|
dry_run=dry_run,
|
|
session=session,
|
|
)
|
|
if result != 0:
|
|
return result
|
|
|
|
if create_change_set:
|
|
click.echo(str(create_change_set))
|
|
elif output is not None:
|
|
click.echo(json.dumps(output, ensure_ascii=False))
|
|
|
|
return 0
|
|
|
|
|
|
def _curate_records(
|
|
source: Iterable,
|
|
service_url: str,
|
|
collection: str,
|
|
label: str,
|
|
curator_token: str,
|
|
destination_service_url: str,
|
|
destination_collection: str,
|
|
destination_token: str,
|
|
author_id: str | None,
|
|
pid: str | None,
|
|
keep_inboxes: bool,
|
|
only_if_modifying: bool,
|
|
jsonpath_spec: str | None,
|
|
dry_run: bool,
|
|
session: Session,
|
|
) -> int:
|
|
|
|
record_comparer = None
|
|
if only_if_modifying:
|
|
record_comparer = RecordComparer(jsonpath_spec=jsonpath_spec)
|
|
else:
|
|
if jsonpath_spec:
|
|
console.print('[yellow]Warning[/yellow]: ignoring --jsonpath-spec because --only-if-modifying was not provided')
|
|
|
|
for record, _, _, _, _ in source:
|
|
if pid:
|
|
if record['pid'] not in pid:
|
|
logger.debug(
|
|
'ignoring record with non-matching pid: %s',
|
|
record['pid'])
|
|
continue
|
|
|
|
class_name = _get_class_name(record)
|
|
if class_name is None:
|
|
console.print(f'[yellow]Warning[/yellow]: could not determine class in record [yellow]{record["pid"]}[/yellow], ignoring it.')
|
|
continue
|
|
|
|
if record_comparer:
|
|
existing_record = curated_read_record_with_pid(
|
|
service_url=service_url,
|
|
collection=collection,
|
|
pid=record['pid'],
|
|
token=curator_token,
|
|
session=session,
|
|
)
|
|
if existing_record:
|
|
if record_comparer.is_equal(existing_record, record):
|
|
console.print(f'skipping writing of record [green]{record["pid"]}[/green] because a matching record already exists')
|
|
continue
|
|
|
|
if dry_run:
|
|
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to collection "{destination_collection}" on "{destination_service_url}"')
|
|
else:
|
|
# Store record in destination collection
|
|
try:
|
|
curated_write_record(
|
|
service_url=destination_service_url,
|
|
collection=destination_collection,
|
|
class_name=class_name,
|
|
record=record,
|
|
author_id=author_id,
|
|
token=destination_token,
|
|
session=session,
|
|
)
|
|
except HTTPError as e:
|
|
console.print(
|
|
f'[red]Error[/red]: writing record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
|
|
)
|
|
return 1
|
|
|
|
if not keep_inboxes:
|
|
if dry_run:
|
|
console.print(f'[DRY_RUN]:DELETE record [green]"{record["pid"]}"[/green] from inbox "{label}" of collection "{collection}" on "{service_url}"')
|
|
else:
|
|
# Delete record from incoming area
|
|
try:
|
|
incoming_delete_record(
|
|
service_url=service_url,
|
|
collection=collection,
|
|
label=label,
|
|
pid=record['pid'],
|
|
token=curator_token,
|
|
session=session,
|
|
)
|
|
except HTTPError as e:
|
|
console.print(
|
|
f'[red]Error[/red]: deleting record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
|
|
)
|
|
return 1
|
|
return 0
|
|
|
|
|
|
def _get_class_name(record: dict) -> str | None:
|
|
"""Get the class name from the `schema_type` attribute.
|
|
|
|
This requires that the schema type is either stored in the record or that
|
|
the store has a "Schema Type Layer", i.e., the store type is
|
|
`record_dir+stl`, or `sqlite+stl`.
|
|
"""
|
|
try:
|
|
return re.search('([_A-Za-z0-9]*$)', record['schema_type']).group(0)
|
|
except (IndexError, KeyError):
|
|
global stl_info
|
|
console.print(f'[yellow]Warning[/yellow]: ignoring record with pid [yellow]{record["pid"]}[/yellow] because `schema_type` attribute is missing.')
|
|
if not stl_info:
|
|
console.print(
|
|
' [yellow]Please ensure that `schema_type` is stored in the records. Note: '
|
|
'if the incoming area store has a backend with a "Schema Type Layer", i.e., '
|
|
'"record_dir+stl" or "sqlite+stl", `schema_type` will not be stored on persistent '
|
|
'storage and will not be returned when retrieving records from the incoming area.'
|
|
'dump-things-service circumvents the "Schema Type Layer" on curator access, i.e., '
|
|
'on access to incoming areas and on access to the curated area. Therefore it '
|
|
'might return records without `schema_type` attributes on curator access to '
|
|
'incoming areas or curated areas. To be consistent, it might be a good idea to '
|
|
'NOT use a "Schema Type Layer" in collections that shall be auto-curated, but '
|
|
'store `schema_type`-attributes in all records.[/yellow]',
|
|
)
|
|
stl_info = True
|
|
return None
|
|
|
|
|
|
def _update_change_set(
|
|
source: Iterable,
|
|
change_set: Path,
|
|
label: str,
|
|
destination_service_url: str,
|
|
destination_collection: str,
|
|
destination_token: str,
|
|
pid: str | None,
|
|
keep_inboxes: bool,
|
|
dry_run: bool,
|
|
session: Session,
|
|
) -> int:
|
|
label_dir = change_set / 'records' / label
|
|
annotation_dir = change_set / 'annotations' / label
|
|
if dry_run:
|
|
console.print(f'[DRY_RUN]:CREATING LABEL DIRECTORY: {label_dir}')
|
|
else:
|
|
label_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
if dry_run:
|
|
console.print(f'[DRY_RUN]:CREATING ANNOTATION DIRECTORY: {annotation_dir}')
|
|
else:
|
|
annotation_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
for record, _, _, _, _ in source:
|
|
if pid:
|
|
if record['pid'] not in pid:
|
|
logger.debug(
|
|
'ignoring record with non-matching pid: %s',
|
|
record['pid'])
|
|
continue
|
|
|
|
class_name = _get_class_name(record)
|
|
if class_name is None:
|
|
continue
|
|
|
|
file_path = label_dir / _pid_to_filename(record['pid'])
|
|
annotation_path = annotation_dir / _pid_to_filename(record['pid'])
|
|
|
|
# Fetch the current curated record from the destination, delete its
|
|
# annotations, write it to a file, and add the file to git.
|
|
existing_record = curated_read_record_with_pid(
|
|
service_url=destination_service_url,
|
|
collection=destination_collection,
|
|
pid=record['pid'],
|
|
token=destination_token,
|
|
session=session,
|
|
)
|
|
if existing_record:
|
|
existing_record.pop('annotations', None)
|
|
if dry_run:
|
|
console.print(f'[DRY_RUN]:STORE existing record [green]"{record["pid"]}"[/green] of class "{class_name}" from curated area at "{file_path.resolve()}"')
|
|
else:
|
|
file_path.write_text(
|
|
json.dumps(existing_record, ensure_ascii=False, indent=2) + '\n',
|
|
encoding='utf8'
|
|
)
|
|
else:
|
|
if dry_run:
|
|
console.print(f'[DRY_RUN]:STORE "null" at "{file_path.resolve()}"')
|
|
else:
|
|
file_path.write_text('null\n')
|
|
|
|
intra_repo_file_path = file_path.relative_to(change_set)
|
|
if dry_run:
|
|
console.print(f'[DRY_RUN]:GIT ADD {intra_repo_file_path} (cdw={change_set})')
|
|
else:
|
|
subprocess.run(
|
|
['git', 'add', str(intra_repo_file_path)],
|
|
capture_output=True,
|
|
cwd=change_set,
|
|
check=True,
|
|
)
|
|
|
|
# Write the new record without annotations to disk
|
|
annotations = record.pop('annotations', None)
|
|
if dry_run:
|
|
console.print(f'[DRY_RUN]:STORE new record [green]"{record["pid"]}"[/green] of class "{class_name}" from inbox {label} at "{file_path.resolve()}"')
|
|
else:
|
|
file_path.write_text(
|
|
json.dumps(record, ensure_ascii=False, indent=2) + '\n',
|
|
encoding='utf8'
|
|
)
|
|
if dry_run:
|
|
console.print(f'[DRY_RUN]:STORE annotations for record [green]"{record["pid"]}"[/green] of class "{class_name}" from curated area at "{annotation_path.resolve()}"')
|
|
else:
|
|
annotation_path.write_text(
|
|
json.dumps(annotations, ensure_ascii=False) + '\n',
|
|
encoding='utf8'
|
|
)
|
|
|
|
if not keep_inboxes:
|
|
if dry_run:
|
|
console.print(f'[DRY_RUN]:DELETING record with pid [green]{record["pid"]}[/green]')
|
|
else:
|
|
# Delete record from incoming area
|
|
try:
|
|
incoming_delete_record(
|
|
service_url=destination_service_url,
|
|
collection=destination_collection,
|
|
label=label,
|
|
pid=record['pid'],
|
|
token=destination_token,
|
|
session=session,
|
|
)
|
|
except HTTPError as e:
|
|
console.print(
|
|
f'[red]Error[/red]: deleting record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
|
|
)
|
|
return 1
|
|
|
|
if dry_run:
|
|
console.print(f'[DRY_RUN]:GIT COMMIT in {change_set}')
|
|
else:
|
|
subprocess.run(
|
|
['git', 'commit', '-m', 'commit curated state'],
|
|
capture_output=True,
|
|
cwd=change_set,
|
|
check=True,
|
|
)
|
|
|
|
return 0
|
|
|
|
|
|
def _pid_to_filename(pid: str) -> str:
|
|
return pid.replace('-', '--').replace('/', '-_')
|
|
|
|
|
|
def _post_change_set(
|
|
source: Iterable,
|
|
post_change_set: Path,
|
|
label: str,
|
|
destination_service_url: str,
|
|
destination_collection: str,
|
|
destination_token: str,
|
|
author_id: str | None,
|
|
pid: str | None,
|
|
add_annotations: bool,
|
|
only_if_modifying: bool,
|
|
jsonpath_spec: str | None,
|
|
dry_run: bool,
|
|
session: Session,
|
|
):
|
|
pid_file_name = _pid_to_filename(pid) if pid else None
|
|
|
|
record_comparer = None
|
|
if only_if_modifying:
|
|
record_comparer = RecordComparer(jsonpath_spec=jsonpath_spec)
|
|
else:
|
|
if jsonpath_spec:
|
|
console.print('[yellow]Warning[/yellow]: ignoring --jsonpath-spec because --only-if-modifying was not provided')
|
|
|
|
for file_name in source:
|
|
|
|
if pid_file_name and pid_file_name != file_name:
|
|
logger.debug('ignoring file with non-matching pid: %s', pid_file_name)
|
|
continue
|
|
|
|
try:
|
|
record_file = post_change_set / 'records' / label / file_name
|
|
record = json.loads(record_file.read_text())
|
|
except json.JSONDecodeError as jde:
|
|
console.print(f'[yellow]Warning[/yellow]: JSON error: {jde} at [yellow]{record_file}[/yellow], ignoring it.')
|
|
continue
|
|
|
|
if add_annotations:
|
|
annotation_file = post_change_set / 'annotations' / label / file_name
|
|
try:
|
|
annotations = json.loads(annotation_file.read_text())
|
|
except json.JSONDecodeError as jde:
|
|
console.print(f'[yellow]Warning[/yellow]: JSON error: {jde} at [yellow]{annotation_file}[/yellow], ignoring record.')
|
|
continue
|
|
record['annotations'] = annotations
|
|
|
|
class_name = _get_class_name(record)
|
|
if class_name is None:
|
|
console.print(f'[yellow]Warning[/yellow]: could not determine class of record [yellow]{record["pid"]}[/yellow], ignoring it.')
|
|
continue
|
|
|
|
if record_comparer:
|
|
existing_record = curated_read_record_with_pid(
|
|
service_url=destination_service_url,
|
|
collection=destination_collection,
|
|
pid=record['pid'],
|
|
token=destination_token,
|
|
session=session,
|
|
)
|
|
if existing_record:
|
|
if record_comparer.is_equal(existing_record, record):
|
|
console.print(f'skipping writing of record [green]{record["pid"]}[/green] because a matching record already exists')
|
|
continue
|
|
|
|
if dry_run:
|
|
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to collection "{destination_collection}" on "{destination_service_url}"')
|
|
continue
|
|
|
|
# Store record in destination collection
|
|
try:
|
|
curated_write_record(
|
|
service_url=destination_service_url,
|
|
collection=destination_collection,
|
|
class_name=class_name,
|
|
record=record,
|
|
author_id=author_id,
|
|
token=destination_token,
|
|
session=session,
|
|
)
|
|
except HTTPError as e:
|
|
console.print(
|
|
f'[red]Error[/red]: writing record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
|
|
)
|
|
return 1
|
|
|
|
return 0
|
|
|
|
|
|
def _get_change_set_labels(
|
|
change_set: Path,
|
|
) -> tuple[str]:
|
|
return tuple(
|
|
map(
|
|
lambda match: match.name,
|
|
change_set.glob(f'records/*')
|
|
)
|
|
)
|
|
|
|
|
|
def _get_change_set_records(
|
|
change_set: Path,
|
|
label: str,
|
|
) -> tuple[str]:
|
|
return tuple(
|
|
map(
|
|
lambda match: match.name,
|
|
change_set.glob(f'records/{label}/*')
|
|
)
|
|
)
|