Add --create-change-set and --post-change-set option to dtc-subcommand auto-curate #34
17 changed files with 2322 additions and 809 deletions
31
.forgejo/workflows/run_tests.yaml
Normal file
31
.forgejo/workflows/run_tests.yaml
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
name: Test execution
|
||||
on: [push]
|
||||
jobs:
|
||||
Test-all:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up git
|
||||
run : |
|
||||
git config --global user.email "tester@example.com"
|
||||
git config --global user.name "Automated Tester"
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
|
||||
- name: Install uv
|
||||
uses: astral-sh/setup-uv@v6
|
||||
|
||||
- name: Create virtual environment
|
||||
run : |
|
||||
uv venv
|
||||
|
||||
- name: Install this package
|
||||
run : |
|
||||
uv pip install .
|
||||
|
||||
- name: Run tests
|
||||
run: |
|
||||
uv run --extra tests pytest
|
||||
|
|
@ -1 +0,0 @@
|
|||
3.13
|
||||
23
CHANGELOG.md
23
CHANGELOG.md
|
|
@ -1,3 +1,25 @@
|
|||
# 0.2.13 (2026-03-03)
|
||||
|
||||
## New features
|
||||
|
||||
- The `dtc` subcommand `post-records` now accepts the class name `*`. If this
|
||||
class name is given, it will try to infer the class name from the
|
||||
`schema_type` property of the input records. `post-records` also got the new
|
||||
options `--ignore-errors` and `--dry-run`.
|
||||
|
||||
- The `dtc` subcommand `auto-curate` has the new options
|
||||
`--create-change-set CHANGE_SET_DIR` and `--post-change-set`. With
|
||||
`--create-change-set`, `auto-curate` will generate a
|
||||
git repository in `CHANGE_SET_DIR` that contains the changes that
|
||||
would be introduced by the curation in the git worktree. The records in the
|
||||
change set have no `annotations`-property, annotations are store separately
|
||||
in the change set change set. If `--post-change-set` is provided,
|
||||
`auto-curate` will post the records from the change set to the curated area
|
||||
of the service. With
|
||||
the option `--add-annotations` the annotations that were read from the inbox
|
||||
will be added to the records before they are posted to the service.
|
||||
|
||||
|
||||
# 0.2.12 (2026-02-11)
|
||||
|
||||
## New features
|
||||
|
|
@ -9,7 +31,6 @@
|
|||
`import`, and `delete-records`.
|
||||
|
||||
- Ensure that all `dtc` subcommands return an non-zero exit status on error.
|
||||
|
||||
|
||||
## Improvements
|
||||
|
||||
|
|
|
|||
|
|
@ -1,22 +1,23 @@
|
|||
import json
|
||||
import logging
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from itertools import (
|
||||
count,
|
||||
chain,
|
||||
)
|
||||
from itertools import chain
|
||||
from pathlib import Path
|
||||
from typing import (
|
||||
cast,
|
||||
Iterable,
|
||||
)
|
||||
|
||||
import rich_click as click
|
||||
from requests import Session
|
||||
from rich.console import Console
|
||||
from rich.progress import track
|
||||
|
||||
from ...communicate import (
|
||||
HTTPError,
|
||||
curated_read_record_with_pid,
|
||||
curated_write_record,
|
||||
get_session,
|
||||
incoming_delete_record,
|
||||
|
|
@ -25,6 +26,8 @@ from ...communicate import (
|
|||
)
|
||||
|
||||
|
||||
subcommand_name = 'auto-curate'
|
||||
|
||||
logger = logging.getLogger('auto-curate')
|
||||
|
||||
console = Console(file=sys.stderr)
|
||||
|
|
@ -56,6 +59,42 @@ stl_info = False
|
|||
metavar='DEST_TOKEN',
|
||||
help='if provided, this token will be used the authenticate against DEST_SERVICE_URL, which defaults to SERVICE_URL (the default is the token provided via --token)',
|
||||
)
|
||||
@click.option(
|
||||
'--create-change-set',
|
||||
metavar='CHANGE_SET_DIR',
|
||||
help='write a change set to the specified directory instead of modifying data on a server. A change set contains all changes that would be performed in the destination collection (as git-repo with modifications). '
|
||||
'`annotations` are not included in the modifications, but are stored separately in the change set directory. If the directory does not exist, it will be created. Specifying an existing directory that is not empty is an error',
|
||||
type=click.Path(
|
||||
exists=False,
|
||||
file_okay=False,
|
||||
dir_okay=True,
|
||||
writable=True,
|
||||
allow_dash=False,
|
||||
path_type=Path,
|
||||
),
|
||||
)
|
||||
@click.option(
|
||||
'--post-change-set',
|
||||
metavar='CHANGE_SET_DIR',
|
||||
help='read the change set at CHANGE_SET_DIR and post it to the curated area of the given destination server and collection. '
|
||||
'If `--add-annotations` is provided in addition, the annotations that were recorded in the records in the incoming areas '
|
||||
'are added to the respective records before posting them to the destination (--post-change-set and '
|
||||
'--create-change-set are mutually exclusive)',
|
||||
type=click.Path(
|
||||
exists=True,
|
||||
file_okay=False,
|
||||
dir_okay=True,
|
||||
writable=True,
|
||||
allow_dash=False,
|
||||
path_type=Path,
|
||||
),
|
||||
)
|
||||
@click.option(
|
||||
'--add-annotations',
|
||||
help='add annotations when posting a change set via `--post-change-set`. By default the annotations will not be added to the records that are posted. If --post-change-set is not provided, this flag is ignored',
|
||||
default=False,
|
||||
is_flag=True,
|
||||
)
|
||||
@click.option(
|
||||
'--pid', '-p',
|
||||
metavar='PID',
|
||||
|
|
@ -63,11 +102,13 @@ stl_info = False
|
|||
)
|
||||
@click.option(
|
||||
'--exclude', '-e',
|
||||
metavar='INBOX_LABEL',
|
||||
help='exclude an inbox on the source collection (repeatable)',
|
||||
multiple=True,
|
||||
)
|
||||
@click.option(
|
||||
'--include', '-i',
|
||||
metavar='INBOX_LABEL',
|
||||
help='process only the given inbox, all other inboxes are ignored (repeatable, -e/--exclude is applied after inclusion)',
|
||||
multiple=True,
|
||||
)
|
||||
|
|
@ -96,6 +137,9 @@ def cli(
|
|||
destination_service_url,
|
||||
destination_collection,
|
||||
destination_token,
|
||||
create_change_set,
|
||||
post_change_set,
|
||||
add_annotations,
|
||||
pid,
|
||||
exclude,
|
||||
include,
|
||||
|
|
@ -108,6 +152,14 @@ def cli(
|
|||
collection, or to the curated area of another collection, possibly on
|
||||
another service.
|
||||
|
||||
`auto-curate` can create change sets that reflect what changes would be
|
||||
applied to the destination, if records in inboxes were posted to the
|
||||
destination. The changes are reflected as worktree diff in a git
|
||||
repository (annotations are excluded from the diff).
|
||||
`auto-curate` can also read records from a change set and post them
|
||||
a service. If required, annotations can be re-added before the records
|
||||
from the change set are posted to the service.
|
||||
|
||||
A token is required and will be used to authenticate the requests.
|
||||
The token must have curator-rights."""
|
||||
try:
|
||||
|
|
@ -119,6 +171,9 @@ def cli(
|
|||
destination_service_url,
|
||||
destination_collection,
|
||||
destination_token,
|
||||
create_change_set,
|
||||
post_change_set,
|
||||
add_annotations,
|
||||
pid,
|
||||
exclude,
|
||||
include,
|
||||
|
|
@ -141,6 +196,9 @@ def auto_curate(
|
|||
destination_service_url,
|
||||
destination_collection,
|
||||
destination_token,
|
||||
create_change_set,
|
||||
post_change_set,
|
||||
add_annotations,
|
||||
pid,
|
||||
exclude,
|
||||
include,
|
||||
|
|
@ -154,6 +212,35 @@ def auto_curate(
|
|||
console.print(f'[red]Error[/red]: no token was provided (use --token or DTC_TOKEN environment variable)')
|
||||
return 1
|
||||
|
||||
if create_change_set:
|
||||
if post_change_set:
|
||||
console.print(f'[red]Error[/red]: --create-change-set and --post-change-set must not be specified together')
|
||||
return 1
|
||||
|
||||
if list_labels or list_records:
|
||||
console.print(f'[red]Error[/red]: --create-change-set cannot be combined with --list-labels or --list-records')
|
||||
return 1
|
||||
|
||||
if create_change_set.exists():
|
||||
if tuple(create_change_set.glob('*')) != tuple():
|
||||
console.print(f'[red]Error[/red]: {create_change_set} already exists and is not empty')
|
||||
return 1
|
||||
else:
|
||||
if dry_run:
|
||||
console.print(f'[DRY_RUN]:CREATE DIRECORY: {create_change_set}')
|
||||
else:
|
||||
create_change_set.mkdir()
|
||||
|
||||
if dry_run:
|
||||
console.print(f'[DRY_RUN]:INITIALIZING GIT REPO: {create_change_set}')
|
||||
else:
|
||||
subprocess.run(['git', 'init', str(create_change_set)], check=1)
|
||||
|
||||
if post_change_set:
|
||||
if list_labels or list_records:
|
||||
console.print(f'[red]Error[/red]: --post-change-set cannot be combined with --list-labels or --list-records')
|
||||
return 1
|
||||
|
||||
if destination_collection is None:
|
||||
destination_collection = collection
|
||||
|
||||
|
|
@ -163,26 +250,32 @@ def auto_curate(
|
|||
if destination_token is None:
|
||||
destination_token = curator_token
|
||||
|
||||
session = get_session()
|
||||
|
||||
output = None
|
||||
|
||||
# If --list-labels and --list-records are provided, keep only the latter,
|
||||
# because it includes listing of labels
|
||||
if list_records:
|
||||
if list_labels:
|
||||
logger.warning('`-l/--list-labels` and `-r/--list-records` defined, ignoring `-l/--list-labels`')
|
||||
console.print(f'[yellow]Warning[/yellow]: `-l/--list-labels` and `-r/--list-records` defined, ignoring `-l/--list-labels`')
|
||||
list_labels = False
|
||||
output = {}
|
||||
|
||||
if list_labels:
|
||||
output = []
|
||||
|
||||
session = get_session()
|
||||
all_labels = incoming_read_labels(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
token=obj,
|
||||
session=session,
|
||||
)
|
||||
# Get the labels from the service, or from a change set that should be posted.
|
||||
if post_change_set:
|
||||
all_labels = _get_change_set_labels(post_change_set)
|
||||
else:
|
||||
all_labels = incoming_read_labels(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
token=obj,
|
||||
session=session,
|
||||
)
|
||||
|
||||
for label in all_labels:
|
||||
if include and label not in include:
|
||||
logger.debug('ignoring non-included incoming label: %s', label)
|
||||
|
|
@ -196,109 +289,97 @@ def auto_curate(
|
|||
output.append(label)
|
||||
continue
|
||||
|
||||
if list_records:
|
||||
output[label] = []
|
||||
# Get the records from the service, or from a change set that should
|
||||
# be posted.
|
||||
if post_change_set:
|
||||
record_source = _get_change_set_records(post_change_set, label)
|
||||
final_record_source = track(
|
||||
record_source,
|
||||
description=f'processing [green]{label}[/green]',
|
||||
total=len(record_source),
|
||||
console=console,
|
||||
)
|
||||
|
||||
# Get the total number of entries for the
|
||||
record_source = incoming_read_records(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
label=label,
|
||||
token=obj,
|
||||
session=session,
|
||||
)
|
||||
else:
|
||||
record_source = incoming_read_records(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
label=label,
|
||||
token=obj,
|
||||
session=session,
|
||||
)
|
||||
|
||||
# Get the first entry to find the total number of records
|
||||
try:
|
||||
first_record, _, _, _, total = next(record_source)
|
||||
except StopIteration:
|
||||
console.print(f'no records in incoming area [green]{label}[/green], skipping it')
|
||||
continue
|
||||
# Get the first entry to find the total number of records in the
|
||||
# incoming area `label`.
|
||||
try:
|
||||
first_record, _, _, _, total = next(record_source)
|
||||
except StopIteration:
|
||||
console.print(f'no records in incoming area [green]{label}[/green], skipping it')
|
||||
continue
|
||||
|
||||
# Get the first entry an all other entries
|
||||
for index, (record, _, _, _, total) in track(
|
||||
zip(
|
||||
count(),
|
||||
chain(
|
||||
[(first_record, None, None, None, None)],
|
||||
cast(Iterable, record_source),
|
||||
),
|
||||
# Create a source that returns the first entry, all remaining entries,
|
||||
# and tracks progress.
|
||||
final_record_source = track(
|
||||
chain(
|
||||
[(first_record, None, None, None, None)],
|
||||
cast(Iterable, record_source),
|
||||
),
|
||||
description=f'processing [green]{label}[/green]',
|
||||
total=total,
|
||||
console=console,
|
||||
):
|
||||
if list_records:
|
||||
output[label].append(record)
|
||||
continue
|
||||
)
|
||||
|
||||
if pid:
|
||||
if record['pid'] not in pid:
|
||||
logger.debug(
|
||||
'ignoring record with non-matching pid: %s',
|
||||
record['pid'])
|
||||
continue
|
||||
if create_change_set:
|
||||
result = _update_change_set(
|
||||
source=final_record_source,
|
||||
change_set=create_change_set,
|
||||
label=label,
|
||||
destination_service_url=destination_service_url,
|
||||
destination_collection=destination_collection,
|
||||
destination_token=destination_token,
|
||||
pid=pid,
|
||||
dry_run=dry_run,
|
||||
session=session,
|
||||
)
|
||||
if result != 0:
|
||||
return result
|
||||
|
||||
# Get the class name from the `schema_type` attribute. This requires
|
||||
# that the schema type is either stored in the record or that the
|
||||
# store has a "Schema Type Layer", i.e., the store type is
|
||||
# `record_dir+stl`, or `sqlite+stl`.
|
||||
try:
|
||||
class_name = re.search('([_A-Za-z0-9]*$)', record['schema_type']).group(0)
|
||||
except (IndexError, KeyError):
|
||||
global stl_info
|
||||
console.print(f'[yellow]Warning[/yellow]: ignoring record with pid [yellow]{record["pid"]}[/yellow] because `schema_type` attribute is missing.')
|
||||
if not stl_info:
|
||||
console.print(
|
||||
' [yellow]Please ensure that `schema_type` is stored in the records. Note: '
|
||||
'if the incoming area store has a backend with a "Schema Type Layer", i.e., '
|
||||
'"record_dir+stl" or "sqlite+stl", `schema_type` will not be stored on persistent '
|
||||
'storage and will not be returned when retrieving records from the incoming area. '
|
||||
'dump-things-service <= 5.4.0 circumvented the "Schema Type Layer", therefore they '
|
||||
'will return records without `schema_type` attributes on curator access to '
|
||||
'incoming areas or curated areas. Therefore it might be a good idea to NOT use a '
|
||||
'"Schema Type Layer" in collections that shall be auto-curated, when using '
|
||||
'dump-things-service <= 5.4.0.[/yellow]',
|
||||
)
|
||||
stl_info = True
|
||||
continue
|
||||
elif list_records:
|
||||
output[label] = [record for record, _, _, _, _, in final_record_source]
|
||||
continue
|
||||
|
||||
if dry_run:
|
||||
console.print(f'WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to collection "{destination_collection}" on "{destination_service_url}"')
|
||||
console.print(f'DELETE record [green]"{record["pid"]}"[/green] from inbox "{label}" of collection "{collection}" on "{service_url}"')
|
||||
continue
|
||||
elif post_change_set:
|
||||
result = _post_change_set(
|
||||
final_record_source,
|
||||
post_change_set=post_change_set,
|
||||
label=label,
|
||||
destination_service_url=destination_service_url,
|
||||
destination_collection=destination_collection,
|
||||
destination_token=destination_token,
|
||||
pid=pid,
|
||||
add_annotations=add_annotations,
|
||||
dry_run=dry_run,
|
||||
session=session,
|
||||
)
|
||||
if result != 0:
|
||||
return result
|
||||
|
||||
# Store record in destination collection
|
||||
try:
|
||||
curated_write_record(
|
||||
service_url=destination_service_url,
|
||||
collection=destination_collection,
|
||||
class_name=class_name,
|
||||
record=record,
|
||||
token=destination_token,
|
||||
session=session,
|
||||
)
|
||||
except HTTPError as e:
|
||||
console.print(
|
||||
f'[red]Error[/red]: writing record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
|
||||
)
|
||||
return 1
|
||||
|
||||
# Delete record from incoming area
|
||||
try:
|
||||
incoming_delete_record(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
label=label,
|
||||
pid=record['pid'],
|
||||
token=curator_token,
|
||||
session=session,
|
||||
)
|
||||
except HTTPError as e:
|
||||
console.print(
|
||||
f'[red]Error[/red]: deleting record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
|
||||
)
|
||||
return 1
|
||||
else:
|
||||
result = _curate_records(
|
||||
final_record_source,
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
label=label,
|
||||
curator_token=curator_token,
|
||||
destination_service_url=destination_service_url,
|
||||
destination_collection=destination_collection,
|
||||
destination_token=destination_token,
|
||||
pid=pid,
|
||||
dry_run=dry_run,
|
||||
session=session,
|
||||
)
|
||||
if result != 0:
|
||||
return result
|
||||
|
||||
if output is not None:
|
||||
click.echo(json.dumps(output, ensure_ascii=False))
|
||||
|
|
@ -306,4 +387,277 @@ def auto_curate(
|
|||
return 0
|
||||
|
||||
|
||||
subcommand_name = 'auto-curate'
|
||||
def _curate_records(
|
||||
source: Iterable,
|
||||
service_url: str,
|
||||
collection: str,
|
||||
label: str,
|
||||
curator_token: str,
|
||||
destination_service_url: str,
|
||||
destination_collection: str,
|
||||
destination_token: str,
|
||||
pid: str | None,
|
||||
dry_run: bool,
|
||||
session: Session,
|
||||
) -> int:
|
||||
for record, _, _, _, _ in source:
|
||||
if pid:
|
||||
if record['pid'] not in pid:
|
||||
logger.debug(
|
||||
'ignoring record with non-matching pid: %s',
|
||||
record['pid'])
|
||||
continue
|
||||
|
||||
class_name = _get_class_name(record)
|
||||
if class_name is None:
|
||||
console.print(f'[yellow]Warning[/yellow]: could not determine class in record [yellow]{record["pid"]}[/yellow], ignoring it.')
|
||||
continue
|
||||
|
||||
if dry_run:
|
||||
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to collection "{destination_collection}" on "{destination_service_url}"')
|
||||
console.print(f'[DRY_RUN]:DELETE record [green]"{record["pid"]}"[/green] from inbox "{label}" of collection "{collection}" on "{service_url}"')
|
||||
continue
|
||||
|
||||
# Store record in destination collection
|
||||
try:
|
||||
curated_write_record(
|
||||
service_url=destination_service_url,
|
||||
collection=destination_collection,
|
||||
class_name=class_name,
|
||||
record=record,
|
||||
token=destination_token,
|
||||
session=session,
|
||||
)
|
||||
except HTTPError as e:
|
||||
console.print(
|
||||
f'[red]Error[/red]: writing record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
|
||||
)
|
||||
return 1
|
||||
|
||||
# Delete record from incoming area
|
||||
try:
|
||||
incoming_delete_record(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
label=label,
|
||||
pid=record['pid'],
|
||||
token=curator_token,
|
||||
session=session,
|
||||
)
|
||||
except HTTPError as e:
|
||||
console.print(
|
||||
f'[red]Error[/red]: deleting record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
|
||||
)
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
def _get_class_name(record: dict) -> str | None:
|
||||
"""Get the class name from the `schema_type` attribute.
|
||||
|
||||
This requires that the schema type is either stored in the record or that
|
||||
the store has a "Schema Type Layer", i.e., the store type is
|
||||
`record_dir+stl`, or `sqlite+stl`.
|
||||
"""
|
||||
try:
|
||||
return re.search('([_A-Za-z0-9]*$)', record['schema_type']).group(0)
|
||||
except (IndexError, KeyError):
|
||||
global stl_info
|
||||
console.print(f'[yellow]Warning[/yellow]: ignoring record with pid [yellow]{record["pid"]}[/yellow] because `schema_type` attribute is missing.')
|
||||
if not stl_info:
|
||||
console.print(
|
||||
' [yellow]Please ensure that `schema_type` is stored in the records. Note: '
|
||||
'if the incoming area store has a backend with a "Schema Type Layer", i.e., '
|
||||
'"record_dir+stl" or "sqlite+stl", `schema_type` will not be stored on persistent '
|
||||
'storage and will not be returned when retrieving records from the incoming area.'
|
||||
'dump-things-service circumvents the "Schema Type Layer" on curator access, i.e., '
|
||||
'on access to incoming areas and on access to the curated area. Therefore it '
|
||||
'might return records without `schema_type` attributes on curator access to '
|
||||
'incoming areas or curated areas. To be consistent, it might be a good idea to '
|
||||
'NOT use a "Schema Type Layer" in collections that shall be auto-curated, but '
|
||||
'store `schema_type`-attributes in all records.[/yellow]',
|
||||
)
|
||||
stl_info = True
|
||||
return None
|
||||
|
||||
|
||||
def _update_change_set(
|
||||
source: Iterable,
|
||||
change_set: Path,
|
||||
label: str,
|
||||
destination_service_url: str,
|
||||
destination_collection: str,
|
||||
destination_token: str,
|
||||
pid: str | None,
|
||||
dry_run: bool,
|
||||
session: Session,
|
||||
) -> int:
|
||||
label_dir = change_set / 'records' / label
|
||||
annotation_dir = change_set / 'annotations' / label
|
||||
if dry_run:
|
||||
console.print(f'[DRY_RUN]:CREATING LABEL DIRECTORY: {label_dir}')
|
||||
else:
|
||||
label_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if dry_run:
|
||||
console.print(f'[DRY_RUN]:CREATING ANNOTATION DIRECTORY: {annotation_dir}')
|
||||
else:
|
||||
annotation_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for record, _, _, _, _ in source:
|
||||
if pid:
|
||||
if record['pid'] not in pid:
|
||||
logger.debug(
|
||||
'ignoring record with non-matching pid: %s',
|
||||
record['pid'])
|
||||
continue
|
||||
|
||||
class_name = _get_class_name(record)
|
||||
if class_name is None:
|
||||
continue
|
||||
|
||||
file_path = label_dir / _pid_to_filename(record['pid'])
|
||||
annotation_path = annotation_dir / _pid_to_filename(record['pid'])
|
||||
|
||||
# Fetch the current curated record from the destination, delete its
|
||||
# annotations, write it to a file, and add the file to git.
|
||||
existing_record = curated_read_record_with_pid(
|
||||
service_url=destination_service_url,
|
||||
collection=destination_collection,
|
||||
pid=record['pid'],
|
||||
token=destination_token,
|
||||
session=session,
|
||||
)
|
||||
if existing_record:
|
||||
existing_record.pop('annotations', None)
|
||||
if dry_run:
|
||||
console.print(f'[DRY_RUN]:STORE existing record [green]"{record["pid"]}"[/green] of class "{class_name}" from curated area at "{file_path.resolve()}"')
|
||||
else:
|
||||
file_path.write_text(
|
||||
json.dumps(existing_record, ensure_ascii=False) + '\n',
|
||||
encoding='utf8'
|
||||
)
|
||||
else:
|
||||
if dry_run:
|
||||
console.print(f'[DRY_RUN]:STORE "null" at "{file_path.resolve()}"')
|
||||
else:
|
||||
file_path.write_text('null\n')
|
||||
|
||||
intra_repo_file_path = file_path.relative_to(change_set)
|
||||
if dry_run:
|
||||
console.print(f'[DRY_RUN]:GIT ADD {intra_repo_file_path} (cdw={change_set})')
|
||||
else:
|
||||
subprocess.run(['git', 'add', str(intra_repo_file_path)], cwd=change_set, check=True)
|
||||
|
||||
# Write the new record without annotations to disk
|
||||
annotations = record.pop('annotations', None)
|
||||
if dry_run:
|
||||
console.print(f'[DRY_RUN]:STORE new record [green]"{record["pid"]}"[/green] of class "{class_name}" from inbox {label} at "{file_path.resolve()}"')
|
||||
else:
|
||||
file_path.write_text(
|
||||
json.dumps(record, ensure_ascii=False) + '\n',
|
||||
encoding='utf8'
|
||||
)
|
||||
if dry_run:
|
||||
console.print(f'[DRY_RUN]:STORE annotations for record [green]"{record["pid"]}"[/green] of class "{class_name}" from curated area at "{annotation_path.resolve()}"')
|
||||
else:
|
||||
annotation_path.write_text(
|
||||
json.dumps(annotations, ensure_ascii=False) + '\n',
|
||||
encoding='utf8'
|
||||
)
|
||||
|
||||
if dry_run:
|
||||
console.print(f'[DRY_RUN]:GIT COMMIT in {change_set}')
|
||||
else:
|
||||
subprocess.run(['git', 'commit', '-m', 'commit curated state'], cwd=change_set, check=True)
|
||||
return 0
|
||||
|
||||
|
||||
def _pid_to_filename(pid: str) -> str:
|
||||
return pid.replace('-', '--').replace('/', '-_')
|
||||
|
||||
|
||||
def _post_change_set(
|
||||
source: Iterable,
|
||||
post_change_set: Path,
|
||||
label: str,
|
||||
destination_service_url: str,
|
||||
destination_collection: str,
|
||||
destination_token: str,
|
||||
pid: str | None,
|
||||
add_annotations: bool,
|
||||
dry_run: bool,
|
||||
session: Session,
|
||||
):
|
||||
pid_file_name = _pid_to_filename(pid) if pid else None
|
||||
for file_name in source:
|
||||
|
||||
if pid_file_name and pid_file_name != file_name:
|
||||
logger.debug('ignoring file with non-matching pid: %s', pid_file_name)
|
||||
continue
|
||||
|
||||
try:
|
||||
record_file = post_change_set / 'records' / label / file_name
|
||||
record = json.loads(record_file.read_text())
|
||||
except json.JSONDecodeError as jde:
|
||||
console.print(f'[yellow]Warning[/yellow]: JSON error: {jde} at [yellow]{record_file}[/yellow], ignoring it.')
|
||||
continue
|
||||
|
||||
if add_annotations:
|
||||
annotation_file = post_change_set / 'annotations' / label / file_name
|
||||
try:
|
||||
annotations = json.loads(annotation_file.read_text())
|
||||
except json.JSONDecodeError as jde:
|
||||
console.print(f'[yellow]Warning[/yellow]: JSON error: {jde} at [yellow]{annotation_file}[/yellow], ignoring record.')
|
||||
continue
|
||||
record['annotations'] = annotations
|
||||
|
||||
class_name = _get_class_name(record)
|
||||
if class_name is None:
|
||||
console.print(f'[yellow]Warning[/yellow]: could not determine class of record [yellow]{record["pid"]}[/yellow], ignoring it.')
|
||||
continue
|
||||
|
||||
if dry_run:
|
||||
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to collection "{destination_collection}" on "{destination_service_url}"')
|
||||
continue
|
||||
|
||||
# Store record in destination collection
|
||||
try:
|
||||
curated_write_record(
|
||||
service_url=destination_service_url,
|
||||
collection=destination_collection,
|
||||
class_name=class_name,
|
||||
record=record,
|
||||
token=destination_token,
|
||||
session=session,
|
||||
)
|
||||
except HTTPError as e:
|
||||
console.print(
|
||||
f'[red]Error[/red]: writing record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
|
||||
)
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def _get_change_set_labels(
|
||||
change_set: Path,
|
||||
) -> tuple[str]:
|
||||
return tuple(
|
||||
map(
|
||||
lambda match: match.name,
|
||||
change_set.glob(f'records/*')
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def _get_change_set_records(
|
||||
change_set: Path,
|
||||
label: str,
|
||||
) -> tuple[str]:
|
||||
return tuple(
|
||||
map(
|
||||
lambda match: match.name,
|
||||
change_set.glob(f'records/{label}/*')
|
||||
)
|
||||
)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,4 @@
|
|||
|
||||
|
||||
def de_prefix(name: str) -> str:
|
||||
return name.split(':', 1)[-1]
|
||||
|
|
@ -22,6 +22,7 @@ from ...communicate import (
|
|||
incoming_read_records,
|
||||
server,
|
||||
)
|
||||
from .common.prefix import de_prefix
|
||||
|
||||
|
||||
subcommand_name = 'export'
|
||||
|
|
@ -243,7 +244,7 @@ def _store_records(
|
|||
continue
|
||||
break
|
||||
|
||||
class_name = _de_prefix(schema_type)
|
||||
class_name = de_prefix(schema_type)
|
||||
if not keep_schema_type:
|
||||
del record['schema_type']
|
||||
|
||||
|
|
@ -265,12 +266,6 @@ def _store_records(
|
|||
return failed
|
||||
|
||||
|
||||
def _de_prefix(
|
||||
name: str,
|
||||
):
|
||||
return name.split(':', 1)[-1]
|
||||
|
||||
|
||||
def _get_hex_digest(
|
||||
data: str,
|
||||
) -> str:
|
||||
|
|
|
|||
|
|
@ -13,8 +13,11 @@ from ...communicate import (
|
|||
collection_write_record,
|
||||
get_session,
|
||||
)
|
||||
from .common.prefix import de_prefix
|
||||
|
||||
|
||||
subcommand_name = 'post-records'
|
||||
|
||||
logger = logging.getLogger('post-records')
|
||||
|
||||
console = Console(file=sys.stderr)
|
||||
|
|
@ -40,18 +43,35 @@ console = Console(file=sys.stderr)
|
|||
is_flag=True,
|
||||
help='store record directly in curated area instead of an inbox. (Note: requires a token with curator rights)'
|
||||
)
|
||||
@click.option(
|
||||
'--ignore-errors',
|
||||
default=False,
|
||||
is_flag=True,
|
||||
help='ignore errors when posting a record and continue with remaining records',
|
||||
)
|
||||
@click.option(
|
||||
'--dry-run', '-d',
|
||||
help='if provided, do not alter any data, instead print what would be done',
|
||||
default=False,
|
||||
is_flag=True,
|
||||
)
|
||||
def cli(
|
||||
obj,
|
||||
service_url,
|
||||
collection,
|
||||
cls,
|
||||
curated,
|
||||
ignore_errors,
|
||||
dry_run,
|
||||
):
|
||||
"""Read records of class CLASS from standard input and store them in
|
||||
the collection COLLECTION on the service SERVICE_URL. Records should be
|
||||
provided in JSON-lines format. Note: all records are assumed to be of class
|
||||
CLASS. To submit records of multiple classes, the subcommand has to be
|
||||
invoked multiple times, once for each class.
|
||||
provided in JSON-lines format. Note: if CLASS is `*` post-records will try
|
||||
to infer the class from a `schema_type` property in the input records.
|
||||
Otherwise, all records are
|
||||
assumed to be of class CLASS. In this case, to submit records of multiple
|
||||
classes, the
|
||||
subcommand has to be invoked multiple times, once for each class.
|
||||
|
||||
If the `--curated`-option is provided, the records will be stored directly
|
||||
in the curated area of the collection without any alterations, i.e, no
|
||||
|
|
@ -62,6 +82,9 @@ def cli(
|
|||
annotated with the submission time and the user that performed
|
||||
the submission.
|
||||
|
||||
If any record could not be posted, error information about the failing pids
|
||||
will be written to stdout in JSON Lines format (one line per faulty record).
|
||||
|
||||
A token is required and will be used to authenticate the requests.
|
||||
If the `--curated`-option is provided, the token must have
|
||||
curator-rights."""
|
||||
|
|
@ -72,6 +95,8 @@ def cli(
|
|||
collection,
|
||||
cls,
|
||||
curated,
|
||||
ignore_errors,
|
||||
dry_run,
|
||||
)
|
||||
)
|
||||
|
||||
|
|
@ -82,6 +107,8 @@ def post_records(
|
|||
collection,
|
||||
cls,
|
||||
curated,
|
||||
ignore_errors,
|
||||
dry_run,
|
||||
) -> int:
|
||||
token = obj
|
||||
if token is None:
|
||||
|
|
@ -93,35 +120,120 @@ def post_records(
|
|||
else:
|
||||
write_record = collection_write_record
|
||||
|
||||
failed = []
|
||||
session = get_session()
|
||||
for index, line in zip(count(), track(sys.stdin, console=console)):
|
||||
|
||||
try:
|
||||
record = json.loads(line)
|
||||
except Exception as e:
|
||||
console.print(
|
||||
f'[red]Error: reading JSON record #{index} failed[/red]: {e}',
|
||||
)
|
||||
_handle_json_load_error(failed, index, line, e)
|
||||
if ignore_errors:
|
||||
continue
|
||||
break
|
||||
|
||||
if cls == '*':
|
||||
class_name = get_class_from_record(record)
|
||||
if class_name is None:
|
||||
_handle_schema_type_error(failed, index, record)
|
||||
if ignore_errors:
|
||||
continue
|
||||
break
|
||||
else:
|
||||
class_name = cls
|
||||
|
||||
if dry_run:
|
||||
if curated:
|
||||
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to curated area of collection "{collection}" on "{service_url}"')
|
||||
else:
|
||||
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to token-associated incoming area of collection "{collection}" on "{service_url}"')
|
||||
continue
|
||||
|
||||
try:
|
||||
write_record(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
class_name=cls,
|
||||
class_name=class_name,
|
||||
record=record,
|
||||
token=token,
|
||||
session=session,
|
||||
)
|
||||
except HTTPError as e:
|
||||
console.print(
|
||||
f'[red]Error: writing record #{index} with pid {record["pid"]} failed[/red]: {e}: {e.response.text}',
|
||||
)
|
||||
_handle_http_error(failed, index, line, e)
|
||||
if ignore_errors:
|
||||
continue
|
||||
break
|
||||
except Exception as e:
|
||||
console.print(
|
||||
f'[red]Error: writing record #{index} with pid {record["pid"]} failed[/red]: {e}',
|
||||
)
|
||||
raise
|
||||
|
||||
if failed:
|
||||
click.echo('\n'.join(failed))
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
subcommand_name = 'post-records'
|
||||
def _handle_json_load_error(
|
||||
container: list,
|
||||
index: int,
|
||||
line: str,
|
||||
exc: Exception,
|
||||
):
|
||||
console.print(f'[red]Error[/red]: error loading line [red]#{index}[/red]: {line}')
|
||||
container.append(
|
||||
json.dumps(
|
||||
{
|
||||
'status': 'json_load_error',
|
||||
'index': index,
|
||||
'json': line,
|
||||
'detail': str(exc),
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def _handle_http_error(
|
||||
container: list,
|
||||
index: int,
|
||||
line: str,
|
||||
exc: HTTPError,
|
||||
):
|
||||
console.print(f'[red]Error[/red]: error writing line [red]#{index}[/red]: {line}')
|
||||
container.append(
|
||||
json.dumps(
|
||||
{
|
||||
'status': 'http_error',
|
||||
'index': index,
|
||||
'json': line,
|
||||
'url': exc.request.url,
|
||||
'http_status': exc.response.status_code,
|
||||
'http_response': exc.response.text,
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def _handle_schema_type_error(
|
||||
container: list,
|
||||
index: int,
|
||||
record: dict,
|
||||
):
|
||||
console.print(f'[red]Error[/red]: no `schema type` in record [red]#{index}[/red], pid: [red]{record["pid"]}[/red]')
|
||||
container.append(
|
||||
json.dumps(
|
||||
{
|
||||
'status': 'class_reading_error',
|
||||
'pid': record['pid'],
|
||||
'record': record,
|
||||
'message': f'could not get class from record {record["pid"]}',
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def get_class_from_record(record: dict) -> str | None:
|
||||
if 'schema_type' in record:
|
||||
return de_prefix(record['schema_type'])
|
||||
return None
|
||||
|
|
|
|||
9
dump_things_pyclient/conftest.py
Normal file
9
dump_things_pyclient/conftest.py
Normal file
|
|
@ -0,0 +1,9 @@
|
|||
from dump_things_pyclient.tests.fixtures import (
|
||||
dump_stores_simple,
|
||||
dump_things_service,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
'dump_stores_simple',
|
||||
'dump_things_service',
|
||||
]
|
||||
123
dump_things_pyclient/tests/assets/pyclient_testschema.yaml
Normal file
123
dump_things_pyclient/tests/assets/pyclient_testschema.yaml
Normal file
|
|
@ -0,0 +1,123 @@
|
|||
name: demo-research-information-schema
|
||||
description: |
|
||||
A minimal schema that is compatible with concepts.datalad.org's type
|
||||
definitions and annotations.
|
||||
id: https://concepts.datalad.org/s/test-schemas/dump-things-pyclient/unreleased
|
||||
version: UNRELEASED
|
||||
license: MIT
|
||||
prefixes:
|
||||
rdf:
|
||||
prefix_prefix: rdf
|
||||
prefix_reference: http://www.w3.org/1999/02/22-rdf-syntax-ns#
|
||||
rdfs:
|
||||
prefix_prefix: rdfs
|
||||
prefix_reference: http://www.w3.org/2000/01/rdf-schema#
|
||||
xsd:
|
||||
prefix_prefix: xsd
|
||||
prefix_reference: http://www.w3.org/2001/XMLSchema#
|
||||
test:
|
||||
prefix_prefix: test
|
||||
prefix_reference: http://www.example.org/test
|
||||
emit_prefixes:
|
||||
- xsd
|
||||
- test
|
||||
default_prefix: test
|
||||
types:
|
||||
string:
|
||||
name: string
|
||||
description: A character string
|
||||
from_schema: https://w3id.org/linkml/types
|
||||
base: str
|
||||
uri: xsd:string
|
||||
uriorcurie:
|
||||
name: uriorcurie
|
||||
description: a URI or a CURIE
|
||||
notes:
|
||||
- If you are authoring schemas in LinkML YAML, the type is referenced with the
|
||||
lower case "uriorcurie".
|
||||
from_schema: https://w3id.org/linkml/types
|
||||
base: URIorCURIE
|
||||
uri: xsd:anyURI
|
||||
repr: str
|
||||
NodeUriOrCurie:
|
||||
name: NodeUriOrCurie
|
||||
typeof: uriorcurie
|
||||
base: str
|
||||
uri: rdfs:Resource
|
||||
slots:
|
||||
family_name:
|
||||
name: family_name
|
||||
title: Family name
|
||||
range: string
|
||||
given_name:
|
||||
name: given_name
|
||||
title: Given name
|
||||
range: string
|
||||
annotations:
|
||||
name: annotations
|
||||
description: A record of properties of the metadata record on a subject, a collection
|
||||
of tag/text tuples with the semantics of OWL Annotation.
|
||||
title: Annotations
|
||||
range: Annotation
|
||||
multivalued: true
|
||||
inlined: true
|
||||
annotation_tag:
|
||||
name: annotation_tag
|
||||
description: A tag identifying an annotation.
|
||||
title: Tag
|
||||
range: Thing
|
||||
annotation_value:
|
||||
name: annotation_value
|
||||
description: The actual annotation.
|
||||
title: Value
|
||||
range: string
|
||||
pid:
|
||||
name: pid
|
||||
description: Persistent and globally unique identifier of a `Thing`.
|
||||
title: Persistent identifier
|
||||
identifier: true
|
||||
range: uriorcurie
|
||||
schema_type:
|
||||
name: schema_type
|
||||
slot_uri: rdf:type
|
||||
designates_type: true
|
||||
range: NodeUriOrCurie
|
||||
title:
|
||||
name: title
|
||||
range: string
|
||||
classes:
|
||||
Person:
|
||||
name: Person
|
||||
description: Person agents are people, alive, dead, or fictional.
|
||||
is_a: Thing
|
||||
slots:
|
||||
- family_name
|
||||
- given_name
|
||||
ThingMixin:
|
||||
name: ThingMixin
|
||||
mixin: true
|
||||
slots:
|
||||
- annotations
|
||||
- schema_type
|
||||
Thing:
|
||||
name: Thing
|
||||
mixins:
|
||||
- ThingMixin
|
||||
slots:
|
||||
- pid
|
||||
Annotation:
|
||||
name: Annotation
|
||||
slots:
|
||||
- annotation_tag
|
||||
- annotation_value
|
||||
slot_usage:
|
||||
annotation_tag:
|
||||
name: annotation_tag
|
||||
key: true
|
||||
range: AnnotationTag
|
||||
AnnotationTag:
|
||||
is_a: Thing
|
||||
Document:
|
||||
is_a: Thing
|
||||
slots:
|
||||
- title
|
||||
63
dump_things_pyclient/tests/common.py
Normal file
63
dump_things_pyclient/tests/common.py
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
from pathlib import Path
|
||||
from typing import Generator
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
def read_records_from_store(
|
||||
store: Path,
|
||||
collection: str = 'collection_1',
|
||||
incoming: str | None = None,
|
||||
class_name: str = '*',
|
||||
remove_keys: list | None = None,
|
||||
) -> Generator[tuple[Path, str | None, dict], None, None]:
|
||||
"""Read record from a dumpthings-"recorddir" backend
|
||||
|
||||
By default records are read from the curated area. If records should
|
||||
be read from the incoming area, set `incoming` to a glob expression that
|
||||
matches the user IDs for which records should be read. Use '*' for all
|
||||
user IDs (the value of `incoming` is used a glob-expression).
|
||||
|
||||
By default all classes are returned because the glob expression '*' is
|
||||
used for the classes. If `class_name` is set, the value will be used as
|
||||
glob expression for classes. For example, `*Document` would match
|
||||
`XYZDocument` and `Document`.
|
||||
|
||||
If `remove_keys` is not `None`, all keys that are specified in `remove_keys`
|
||||
will be removed from the records before they are returned. For example,
|
||||
remove annotations by specifying `['annotations']`.
|
||||
|
||||
Returns tuples of (record_path, user_id or None, cleaned record content)
|
||||
"""
|
||||
|
||||
config = yaml.safe_load((store / '.dumpthings.yaml').read_text())
|
||||
curated_dir = config['collections'][collection]['curated']
|
||||
incoming_dir = config['collections'][collection]['incoming']
|
||||
|
||||
glob_expression = f'{class_name}/**/*.yaml'
|
||||
if incoming:
|
||||
base_dir = store / incoming_dir
|
||||
glob_expression = f'{incoming}/' + glob_expression
|
||||
else:
|
||||
base_dir = store / curated_dir
|
||||
|
||||
for record_path in base_dir.glob(glob_expression):
|
||||
if record_path.name == '.dumpthings.yaml':
|
||||
continue
|
||||
if incoming:
|
||||
base_parts = base_dir.parts
|
||||
user_id = record_path.parts[len(base_parts)]
|
||||
else:
|
||||
user_id = None
|
||||
record = yaml.safe_load(record_path.read_text())
|
||||
if remove_keys:
|
||||
yield (
|
||||
record_path,
|
||||
user_id,
|
||||
{
|
||||
k: v for k, v in record.items()
|
||||
if k not in remove_keys
|
||||
},
|
||||
)
|
||||
else:
|
||||
yield (record_path, user_id, record)
|
||||
115
dump_things_pyclient/tests/fixtures.py
Normal file
115
dump_things_pyclient/tests/fixtures.py
Normal file
|
|
@ -0,0 +1,115 @@
|
|||
import os
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from dump_things_service import config_file_name
|
||||
|
||||
|
||||
# String representation of curated- and incoming-path
|
||||
curated = 'curated'
|
||||
incoming = 'incoming'
|
||||
|
||||
# Path to a local simple test schema
|
||||
schema_path = Path(__file__).parent / 'assets' / 'pyclient_testschema.yaml'
|
||||
schema_path = schema_path.resolve()
|
||||
|
||||
# The global configuration file, all collections and
|
||||
# staging areas share the same directories. All tokens
|
||||
# of the same collection share an "incoming_label".
|
||||
global_config_text = f"""
|
||||
type: collections
|
||||
version: 1
|
||||
collections:
|
||||
collection_1:
|
||||
default_token: basic_access
|
||||
curated: {curated}
|
||||
incoming: {incoming}
|
||||
backend:
|
||||
type: record_dir
|
||||
auth_sources:
|
||||
- type: config
|
||||
submission_tags:
|
||||
submitter_id_tag: https://submitter.example.com
|
||||
submission_time_tag: https://time.example.com
|
||||
|
||||
tokens:
|
||||
basic_access:
|
||||
user_id: tester
|
||||
collections:
|
||||
collection_1:
|
||||
mode: WRITE_COLLECTION
|
||||
incoming_label: tester
|
||||
token-curator:
|
||||
user_id: test_curator
|
||||
collections:
|
||||
collection_1:
|
||||
mode: CURATOR
|
||||
incoming_label: test_curator
|
||||
"""
|
||||
|
||||
|
||||
collection_1_config = f"""type: records
|
||||
version: 1
|
||||
schema: {schema_path}
|
||||
format: yaml
|
||||
idfx: digest-md5
|
||||
"""
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def dump_stores_simple(tmp_path_factory):
|
||||
tmp_path = tmp_path_factory.mktemp('dump_store')
|
||||
|
||||
# Write the global config file, create collection directories, and write
|
||||
# the collection config file.
|
||||
(tmp_path / config_file_name).write_text(global_config_text)
|
||||
(tmp_path / curated).mkdir(parents=True, exist_ok=True)
|
||||
(tmp_path / incoming).mkdir(parents=True, exist_ok=True)
|
||||
(tmp_path / curated / config_file_name).write_text(collection_1_config)
|
||||
return tmp_path
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def dump_things_service(dump_stores_simple):
|
||||
|
||||
# Start a server on an unused port
|
||||
port = _get_unused_port()
|
||||
process_info = subprocess.Popen(
|
||||
['dump-things-service', '--port', str(port), str(dump_stores_simple)],
|
||||
)
|
||||
|
||||
# Wait until the server is responding to requests
|
||||
waited = 1
|
||||
while True:
|
||||
time.sleep(1)
|
||||
try:
|
||||
result = requests.get(f'http://127.0.0.1:{port}/server')
|
||||
if result.status_code == 200:
|
||||
break
|
||||
msg = f'dump-things-server responded with {result.status_code}: {result.text}'
|
||||
raise ValueError(msg)
|
||||
except requests.exceptions.ConnectionError:
|
||||
waited += 1
|
||||
if waited >= 10:
|
||||
raise Exception from None
|
||||
continue
|
||||
|
||||
# Give the port and storage location to the user
|
||||
yield port, dump_stores_simple
|
||||
|
||||
# Shut the server down
|
||||
os.kill(process_info.pid, signal.SIGTERM)
|
||||
os.waitpid(process_info.pid, 0)
|
||||
|
||||
|
||||
def _get_unused_port() -> int:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.bind(('', 0))
|
||||
addr = s.getsockname()
|
||||
s.close()
|
||||
return addr[1]
|
||||
312
dump_things_pyclient/tests/test_auto_curate.py
Normal file
312
dump_things_pyclient/tests/test_auto_curate.py
Normal file
|
|
@ -0,0 +1,312 @@
|
|||
import json
|
||||
import posix
|
||||
import random
|
||||
import subprocess
|
||||
from itertools import chain
|
||||
|
||||
from click.testing import CliRunner
|
||||
|
||||
from dump_things_pyclient.commands.dtc import cli
|
||||
from dump_things_pyclient.tests.common import read_records_from_store
|
||||
from pygments.lexers import r
|
||||
|
||||
|
||||
prefix = 'https://www.example.com/ac_e2e_test/'
|
||||
|
||||
|
||||
def _create_unique_records(offset: int, count: int) -> dict[int, dict]:
|
||||
while True:
|
||||
pids = tuple(set(random.randrange(offset, offset + 10000) for _ in range(count)))
|
||||
if len(pids) == count:
|
||||
break
|
||||
return {
|
||||
# NOTE: the order of keys is relevant because there are JSON-string
|
||||
# comparisons in the change-set tests below.
|
||||
pid: {
|
||||
'schema_type': 'test:Person',
|
||||
'pid': prefix + f'person_{pid}',
|
||||
'family_name': f'grieg_{pid}',
|
||||
'given_name': f'erwin_{pid}',
|
||||
}
|
||||
for pid in pids
|
||||
}
|
||||
|
||||
|
||||
def test_auto_curate_basic_end_to_end(dump_things_service):
|
||||
port, store = dump_things_service
|
||||
|
||||
|
||||
new_records = tuple(_create_unique_records(10000, 5).values())
|
||||
|
||||
# Add records to inbox
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
['--token=basic_access', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
|
||||
input='\n'.join(
|
||||
json.dumps(record, ensure_ascii=False) for record in new_records
|
||||
) + '\n'
|
||||
)
|
||||
assert result.exit_code == 0, 'dtc post-records failed'
|
||||
|
||||
# Ensure that the records do not yet exist in the curated area
|
||||
stored_curated_records = tuple(
|
||||
map(
|
||||
lambda e: e[2],
|
||||
read_records_from_store(store, class_name='Person', remove_keys=['annotations'])
|
||||
)
|
||||
)
|
||||
for record in new_records:
|
||||
assert record not in stored_curated_records, 'record already exists, possibly a random number collision'
|
||||
|
||||
# Perform auto-curation
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
['--token=token-curator', 'auto-curate', '-i', 'tester', f'http://127.0.0.1:{port}', 'collection_1'],
|
||||
)
|
||||
assert result.exit_code == 0, 'dtc auto-curate failed'
|
||||
|
||||
# Check that the inbox is empty
|
||||
stored_inbox_records = tuple(
|
||||
map(
|
||||
lambda e: e[2],
|
||||
read_records_from_store(store, incoming='tester')
|
||||
)
|
||||
)
|
||||
assert stored_inbox_records == tuple(), 'Inbox not clean after auto-curation'
|
||||
|
||||
# Check that the records are in the curated area
|
||||
stored_curated_records = tuple(
|
||||
map(
|
||||
lambda e: e[2],
|
||||
read_records_from_store(store, class_name='Person', remove_keys=['annotations'])
|
||||
)
|
||||
)
|
||||
for record in new_records:
|
||||
assert record in stored_curated_records
|
||||
|
||||
|
||||
def test_auto_curate_create_change_set_end_to_end(dump_things_service, tmp_path_factory):
|
||||
port, store = dump_things_service
|
||||
change_set_dir = tmp_path_factory.mktemp('create_change_set')
|
||||
|
||||
new_records = _create_unique_records(20000, 5)
|
||||
new_curated_records = _create_unique_records(30000, 5)
|
||||
|
||||
# Add new curated records to inbox and auto-curate them to move them to
|
||||
# the curated area.
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
['--token=basic_access', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
|
||||
input='\n'.join(
|
||||
json.dumps(record, ensure_ascii=False) for record in new_curated_records.values()
|
||||
) + '\n'
|
||||
)
|
||||
assert result.exit_code == 0, 'dtc post-records failed'
|
||||
|
||||
# Perform auto-curation
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
['--token=token-curator', 'auto-curate', '-i', 'tester', f'http://127.0.0.1:{port}', 'collection_1'],
|
||||
)
|
||||
assert result.exit_code == 0, 'dtc auto-curate failed'
|
||||
|
||||
# Modify the record that were auto-curated and upload those together with
|
||||
# newly created records to the inbox.
|
||||
modified_curated_records = {
|
||||
# NOTE: the order of keys is relevant because there are JSON-string
|
||||
# comparisons in the change-set tests below.
|
||||
pid: {
|
||||
'schema_type': 'test:Person',
|
||||
'pid': record['pid'],
|
||||
'family_name': record['family_name'],
|
||||
'given_name': record['given_name'].replace('erwin', 'edvard'),
|
||||
}
|
||||
for pid, record in new_curated_records.items()
|
||||
}
|
||||
|
||||
# Upload the modified (already curated) records and the new records to the
|
||||
# inbox.
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
['--token=basic_access', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
|
||||
input='\n'.join(
|
||||
json.dumps(record, ensure_ascii=False)
|
||||
for record in chain(modified_curated_records.values(), new_records.values())
|
||||
) + '\n'
|
||||
)
|
||||
assert result.exit_code == 0, 'dtc post-records failed'
|
||||
|
||||
# Create a change set
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
[
|
||||
'--token=token-curator',
|
||||
'auto-curate',
|
||||
'--create-change-set', str(change_set_dir),
|
||||
'-i', 'tester',
|
||||
f'http://127.0.0.1:{port}', 'collection_1',
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0, 'dtc auto-curate --create-change-set failed'
|
||||
|
||||
# Check the number of modified records in the change set
|
||||
result = subprocess.run(
|
||||
['git', 'status', '-s',],
|
||||
cwd=str(change_set_dir),
|
||||
check=True,
|
||||
capture_output=True,
|
||||
)
|
||||
lines = [
|
||||
line.strip()
|
||||
for line in result.stdout.decode().splitlines()
|
||||
if 'annotations' not in line
|
||||
]
|
||||
assert len(lines) == len(new_records) + len(modified_curated_records), f'unexpected number of modified records: {len(lines)}'
|
||||
assert all(l.startswith('M records/tester') for l in lines), f'unexpected status for modified records: {lines}'
|
||||
|
||||
# Check for expected diff content
|
||||
result = subprocess.run(
|
||||
['git', 'diff', '-p',],
|
||||
cwd=str(change_set_dir),
|
||||
check=True,
|
||||
capture_output=True,
|
||||
)
|
||||
lines = [line.strip() for line in result.stdout.decode().splitlines()]
|
||||
# Every diff should be seven lines long, the line at index 5 contains the
|
||||
# previous content, the line at index 6 contains the new content.
|
||||
for patch in range(int(len(lines) / 7)):
|
||||
old = json.loads(lines[(7 * patch) + 5][1:])
|
||||
new = json.loads(lines[(7 * patch) + 6][1:])
|
||||
pid = int(lines[7 * patch][-5:])
|
||||
if pid in new_records:
|
||||
assert old == None
|
||||
assert new == new_records[pid]
|
||||
else:
|
||||
assert old == new_curated_records[pid]
|
||||
assert new == modified_curated_records[pid]
|
||||
|
||||
# Check that annotations are stored in the change set
|
||||
annotations = {
|
||||
int(p.name[-5:]): json.loads(p.read_text())
|
||||
for p in (change_set_dir / 'annotations').glob('tester/*')
|
||||
}
|
||||
assert len(annotations) == len(modified_curated_records) + len(new_curated_records)
|
||||
|
||||
|
||||
def test_auto_curate_post_change_set_end_to_end(dump_things_service, tmp_path_factory):
|
||||
port, store = dump_things_service
|
||||
change_set_dir = tmp_path_factory.mktemp('post_change_set')
|
||||
|
||||
new_records = _create_unique_records(40000, 5)
|
||||
|
||||
# Add new records to inbox of 'tester'.
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
['--token=basic_access', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
|
||||
input='\n'.join(
|
||||
json.dumps(record, ensure_ascii=False) for record in new_records.values()
|
||||
) + '\n'
|
||||
)
|
||||
assert result.exit_code == 0, 'dtc post-records failed'
|
||||
|
||||
# Create a change set with the records from 'tester's' inbox
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
[
|
||||
'--token=token-curator',
|
||||
'auto-curate',
|
||||
'--create-change-set', str(change_set_dir),
|
||||
'-i', 'tester',
|
||||
f'http://127.0.0.1:{port}', 'collection_1',
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0, 'dtc auto-curate --create-change-set failed'
|
||||
|
||||
# Post the changeset without annotations
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
[
|
||||
'--token=token-curator',
|
||||
'auto-curate',
|
||||
'--post-change-set', str(change_set_dir),
|
||||
f'http://127.0.0.1:{port}', 'collection_1',
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0, 'dtc auto-curate --post-change-set failed'
|
||||
|
||||
# Check that new records have been posted without annotations
|
||||
curated_records = tuple(
|
||||
map(
|
||||
lambda e: e[2],
|
||||
read_records_from_store(
|
||||
store,
|
||||
class_name='Person',
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
for record in new_records.values():
|
||||
assert record in curated_records
|
||||
for record in curated_records:
|
||||
try:
|
||||
record_pid = int(record['pid'][-5:])
|
||||
if record_pid in new_records:
|
||||
assert 'annotations' not in record, f'unexpected annotations in {record}'
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
# Post the changeset with annotations
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
[
|
||||
'--token=token-curator',
|
||||
'auto-curate',
|
||||
'--post-change-set', str(change_set_dir),
|
||||
'--add-annotations',
|
||||
f'http://127.0.0.1:{port}', 'collection_1',
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0, 'dtc auto-curate --post-change-set --add-annotations failed'
|
||||
|
||||
# Check that new records have been posted with annotations
|
||||
curated_records = tuple(
|
||||
map(
|
||||
lambda e: e[2],
|
||||
read_records_from_store(
|
||||
store,
|
||||
class_name='Person',
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
# Check that all record content has been posted
|
||||
cleaned_curated_records = tuple(
|
||||
{
|
||||
'schema_type': r['schema_type'],
|
||||
'pid': r['pid'],
|
||||
'family_name': r['family_name'],
|
||||
'given_name': r['given_name'],
|
||||
}
|
||||
for r in curated_records
|
||||
)
|
||||
|
||||
for record in new_records.values():
|
||||
assert record in cleaned_curated_records
|
||||
|
||||
# Check that annotations were posted.
|
||||
annotations = {
|
||||
p.name.replace('-_', '/').replace('--', '-'): json.loads(p.read_text())
|
||||
for p in change_set_dir.glob('annotations/tester/*')
|
||||
}
|
||||
|
||||
for record in curated_records:
|
||||
try:
|
||||
record_pid = int(record['pid'][-5:])
|
||||
if record_pid in new_records:
|
||||
assert 'annotations' in record, f'missing annotations in {record}'
|
||||
assert record['annotations'] == annotations[record['pid']]
|
||||
except ValueError:
|
||||
continue
|
||||
12
dump_things_pyclient/tests/test_basic.py
Normal file
12
dump_things_pyclient/tests/test_basic.py
Normal file
|
|
@ -0,0 +1,12 @@
|
|||
import json
|
||||
|
||||
import yaml
|
||||
from click.testing import CliRunner
|
||||
|
||||
from dump_things_pyclient.commands.dtc import cli
|
||||
|
||||
|
||||
def test_dtc_version():
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(cli, ['version'])
|
||||
assert result.exit_code == 0
|
||||
199
dump_things_pyclient/tests/test_post_records.py
Normal file
199
dump_things_pyclient/tests/test_post_records.py
Normal file
|
|
@ -0,0 +1,199 @@
|
|||
import json
|
||||
|
||||
import yaml
|
||||
from click.testing import CliRunner
|
||||
|
||||
from dump_things_pyclient.commands.dtc import cli
|
||||
from dump_things_pyclient.tests.common import read_records_from_store
|
||||
|
||||
|
||||
def test_dtc_post_get_end_to_end(dump_things_service):
|
||||
port, store = dump_things_service
|
||||
|
||||
records = tuple(
|
||||
{"pid": f"test:person_{i}", "given_name": f"gustav_{i}", "family_name": f"maler_{i}"}
|
||||
for i in range(5)
|
||||
)
|
||||
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
['--token=basic_access', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
|
||||
input='\n'.join(
|
||||
json.dumps(record, ensure_ascii=False) for record in records
|
||||
) + '\n'
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
|
||||
# Read all records from disk, check for annotations.
|
||||
for disk_record in map(
|
||||
lambda t: t[2],
|
||||
read_records_from_store(store, incoming='tester', class_name='Person')
|
||||
):
|
||||
if disk_record['given_name'].startswith('gustav'):
|
||||
assert 'annotations' in disk_record
|
||||
|
||||
# Ensure that all records from `records` are on disk
|
||||
cleaned_disk_records = tuple(
|
||||
map(
|
||||
lambda t: t[2],
|
||||
read_records_from_store(store, incoming='tester', class_name='Person', remove_keys=['annotations', 'schema_type'])
|
||||
)
|
||||
)
|
||||
assert all(r in cleaned_disk_records for r in records)
|
||||
|
||||
# Check via get-records, this might return more than the records we just posted
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
[
|
||||
'--token=basic_access',
|
||||
'get-records', f'http://127.0.0.1:{port}', 'collection_1'
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
returned_records = tuple(json.loads(line) for line in result.output.splitlines())
|
||||
|
||||
# Due to other tests, there might be more records in the server than the
|
||||
# ones that we just uploaded
|
||||
assert len(returned_records) >= len(records)
|
||||
|
||||
# This assertion relies on the fact that the result of get-records is
|
||||
# ordered by PID. This is the case because the server does order the
|
||||
# results by PID (by default).
|
||||
cleaned_records = tuple(
|
||||
{k: v for k, v in r.items() if k not in ('annotations', 'schema_type')}
|
||||
for r in returned_records
|
||||
)
|
||||
|
||||
for r in records:
|
||||
assert r in cleaned_records, f'record {r} not found in cleaned records'
|
||||
|
||||
|
||||
def test_dtc_post_curated(dump_things_service):
|
||||
port, store = dump_things_service
|
||||
|
||||
# We add `schema_type` to the test record because the server API code, i.e.,
|
||||
# the fastapi pydantic interface classes that are generated via linkml
|
||||
# add it automatically. The curated interface does never use a schema_type
|
||||
# layer, so `schema_type` will end up in the persisted record. Adding it
|
||||
# to our input record keeps the input and output records consistent, and
|
||||
# we don't have to care about the schema_type attribute when comparing
|
||||
# uploaded and downloaded records.
|
||||
records = tuple(
|
||||
{
|
||||
"pid": f"test:person_curated_{i}",
|
||||
"given_name": f"hans_{i}",
|
||||
"family_name": f"post_curated_{i}",
|
||||
'schema_type': 'test:Person',
|
||||
}
|
||||
for i in range(5)
|
||||
)
|
||||
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
[
|
||||
'--token=token-curator',
|
||||
'post-records', '--curated', f'http://127.0.0.1:{port}', 'collection_1', 'Person'
|
||||
],
|
||||
input='\n'.join(
|
||||
json.dumps(record, ensure_ascii=False) for record in records
|
||||
) + '\n'
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
|
||||
# Check on disk, read all records from disk, check for annotations,
|
||||
# delete their annotations, and ensure that all records from `records`
|
||||
# are in there.
|
||||
curated = store / 'curated' / 'Person'
|
||||
disk_records = tuple(
|
||||
yaml.load(yaml_file.read_text(), Loader=yaml.SafeLoader)
|
||||
for yaml_file in curated.glob('**/*.yaml')
|
||||
)
|
||||
# Check that curated access does not add annotations
|
||||
for record in disk_records:
|
||||
if record['given_name'].startswith('hans_'):
|
||||
assert 'annotations' not in record
|
||||
|
||||
# Check that all uploaded records appear verbatim on disk, i.e., that they
|
||||
# were really uploaded through the `curated`-interface.
|
||||
assert all(r in disk_records for r in records)
|
||||
|
||||
# Check via get-records, this might return more than the records we just posted
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
[
|
||||
'--token=basic_access',
|
||||
'get-records',
|
||||
'--class', 'Person',
|
||||
f'http://127.0.0.1:{port}', 'collection_1'
|
||||
],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
returned_records = tuple(json.loads(line) for line in result.output.splitlines())
|
||||
assert len(returned_records) >= len(records)
|
||||
assert all(r in returned_records for r in records)
|
||||
|
||||
|
||||
def test_dtc_post_record_any_class(dump_things_service):
|
||||
port, store = dump_things_service
|
||||
|
||||
person_records = tuple(
|
||||
{
|
||||
"pid": f"test:person_wildcard_{i}",
|
||||
"given_name": f"johann_{i}",
|
||||
"family_name": f"post_wildcard_class_{i}",
|
||||
'schema_type': 'test:Person',
|
||||
}
|
||||
for i in range(3)
|
||||
)
|
||||
document_records = tuple(
|
||||
{
|
||||
"pid": f"test:document_wildcard_{i}",
|
||||
"title": f"document_{i}",
|
||||
'schema_type': 'test:Document',
|
||||
}
|
||||
for i in range(3)
|
||||
)
|
||||
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
cli,
|
||||
[
|
||||
'--token=basic_access',
|
||||
'post-records',
|
||||
f'http://127.0.0.1:{port}', 'collection_1', '*',
|
||||
],
|
||||
input='\n'.join(
|
||||
json.dumps(record, ensure_ascii=False)
|
||||
for record in person_records + document_records
|
||||
) + '\n'
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
|
||||
# Check that the records were associated with the correct class on disk.
|
||||
# (This checks the interpretation of the schema_type attribute in the
|
||||
# input records and the interface implementation).
|
||||
inbox = store / 'incoming' / 'tester' / 'Person'
|
||||
disk_records = tuple(
|
||||
yaml.load(yaml_file.read_text(), Loader=yaml.SafeLoader)
|
||||
for yaml_file in inbox.glob('**/*.yaml')
|
||||
)
|
||||
cleaned_disk_record = tuple(
|
||||
{k: v for k, v in r.items() if k not in ('annotations',)}
|
||||
for r in disk_records
|
||||
)
|
||||
|
||||
assert all(r in cleaned_disk_record for r in person_records)
|
||||
|
||||
inbox = store / 'incoming' / 'tester' / 'Document'
|
||||
disk_records = tuple(
|
||||
yaml.load(yaml_file.read_text(), Loader=yaml.SafeLoader)
|
||||
for yaml_file in inbox.glob('**/*.yaml')
|
||||
)
|
||||
cleaned_disk_record = tuple(
|
||||
{k: v for k, v in r.items() if k not in ('annotations',)}
|
||||
for r in disk_records
|
||||
)
|
||||
|
||||
assert all(r in cleaned_disk_record for r in document_records)
|
||||
|
|
@ -8,10 +8,10 @@ authors = [
|
|||
{name="Christian Mönch", email="christian.moench@web.de"},
|
||||
]
|
||||
dependencies = [
|
||||
"click>=8.3.1",
|
||||
"pyyaml>=6.0.3",
|
||||
"requests>=2.32.5",
|
||||
"rich-click>=1.9.6",
|
||||
"click",
|
||||
"pyyaml",
|
||||
"requests",
|
||||
"rich-click",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
|
|
@ -19,6 +19,15 @@ ttl = [
|
|||
"dump-things-service>=5.3.0",
|
||||
]
|
||||
tests = [
|
||||
"dump-things-service>=5.3.0",
|
||||
"pytest>=9.0.1",
|
||||
]
|
||||
|
||||
[dependency-groups]
|
||||
tests = [
|
||||
"linkml>=1.10.0",
|
||||
"linkml-runtime>=1.10.0",
|
||||
"dump-things-service>=5.3.0",
|
||||
"pytest>=9.0.1",
|
||||
]
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue