Add --create-change-set and --post-change-set option to dtc-subcommand auto-curate #34

Merged
cmo merged 39 commits from change-set-2 into master 2026-03-10 01:14:09 +00:00
17 changed files with 2322 additions and 809 deletions

View file

@ -0,0 +1,31 @@
name: Test execution
on: [push]
jobs:
Test-all:
runs-on: ubuntu-latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
- name: Set up git
run : |
git config --global user.email "tester@example.com"
git config --global user.name "Automated Tester"
- name: Set up Python
uses: actions/setup-python@v5
- name: Install uv
uses: astral-sh/setup-uv@v6
- name: Create virtual environment
run : |
uv venv
- name: Install this package
run : |
uv pip install .
- name: Run tests
run: |
uv run --extra tests pytest

View file

@ -1 +0,0 @@
3.13

View file

@ -1,3 +1,25 @@
# 0.2.13 (2026-03-03)
## New features
- The `dtc` subcommand `post-records` now accepts the class name `*`. If this
class name is given, it will try to infer the class name from the
`schema_type` property of the input records. `post-records` also got the new
options `--ignore-errors` and `--dry-run`.
- The `dtc` subcommand `auto-curate` has the new options
`--create-change-set CHANGE_SET_DIR` and `--post-change-set`. With
`--create-change-set`, `auto-curate` will generate a
git repository in `CHANGE_SET_DIR` that contains the changes that
would be introduced by the curation in the git worktree. The records in the
change set have no `annotations`-property, annotations are store separately
in the change set change set. If `--post-change-set` is provided,
`auto-curate` will post the records from the change set to the curated area
of the service. With
the option `--add-annotations` the annotations that were read from the inbox
will be added to the records before they are posted to the service.
# 0.2.12 (2026-02-11)
## New features
@ -10,7 +32,6 @@
- Ensure that all `dtc` subcommands return an non-zero exit status on error.
## Improvements
- Improve help messages of `dtc` subcommands `delete-records` and `import`.

View file

@ -1,22 +1,23 @@
import json
import logging
import re
import subprocess
import sys
from itertools import (
count,
chain,
)
from itertools import chain
from pathlib import Path
from typing import (
cast,
Iterable,
)
import rich_click as click
from requests import Session
from rich.console import Console
from rich.progress import track
from ...communicate import (
HTTPError,
curated_read_record_with_pid,
curated_write_record,
get_session,
incoming_delete_record,
@ -25,6 +26,8 @@ from ...communicate import (
)
subcommand_name = 'auto-curate'
logger = logging.getLogger('auto-curate')
console = Console(file=sys.stderr)
@ -56,6 +59,42 @@ stl_info = False
metavar='DEST_TOKEN',
help='if provided, this token will be used the authenticate against DEST_SERVICE_URL, which defaults to SERVICE_URL (the default is the token provided via --token)',
)
@click.option(
'--create-change-set',
metavar='CHANGE_SET_DIR',
help='write a change set to the specified directory instead of modifying data on a server. A change set contains all changes that would be performed in the destination collection (as git-repo with modifications). '
'`annotations` are not included in the modifications, but are stored separately in the change set directory. If the directory does not exist, it will be created. Specifying an existing directory that is not empty is an error',
type=click.Path(
exists=False,
file_okay=False,
dir_okay=True,
writable=True,
allow_dash=False,
path_type=Path,
),
)
@click.option(
'--post-change-set',
metavar='CHANGE_SET_DIR',
help='read the change set at CHANGE_SET_DIR and post it to the curated area of the given destination server and collection. '
'If `--add-annotations` is provided in addition, the annotations that were recorded in the records in the incoming areas '
'are added to the respective records before posting them to the destination (--post-change-set and '
'--create-change-set are mutually exclusive)',
type=click.Path(
exists=True,
file_okay=False,
dir_okay=True,
writable=True,
allow_dash=False,
path_type=Path,
),
)
@click.option(
'--add-annotations',
help='add annotations when posting a change set via `--post-change-set`. By default the annotations will not be added to the records that are posted. If --post-change-set is not provided, this flag is ignored',
default=False,
is_flag=True,
)
@click.option(
'--pid', '-p',
metavar='PID',
@ -63,11 +102,13 @@ stl_info = False
)
@click.option(
'--exclude', '-e',
metavar='INBOX_LABEL',
help='exclude an inbox on the source collection (repeatable)',
multiple=True,
)
@click.option(
'--include', '-i',
metavar='INBOX_LABEL',
help='process only the given inbox, all other inboxes are ignored (repeatable, -e/--exclude is applied after inclusion)',
multiple=True,
)
@ -96,6 +137,9 @@ def cli(
destination_service_url,
destination_collection,
destination_token,
create_change_set,
post_change_set,
add_annotations,
pid,
exclude,
include,
@ -108,6 +152,14 @@ def cli(
collection, or to the curated area of another collection, possibly on
another service.
`auto-curate` can create change sets that reflect what changes would be
applied to the destination, if records in inboxes were posted to the
destination. The changes are reflected as worktree diff in a git
repository (annotations are excluded from the diff).
`auto-curate` can also read records from a change set and post them
a service. If required, annotations can be re-added before the records
from the change set are posted to the service.
A token is required and will be used to authenticate the requests.
The token must have curator-rights."""
try:
@ -119,6 +171,9 @@ def cli(
destination_service_url,
destination_collection,
destination_token,
create_change_set,
post_change_set,
add_annotations,
pid,
exclude,
include,
@ -141,6 +196,9 @@ def auto_curate(
destination_service_url,
destination_collection,
destination_token,
create_change_set,
post_change_set,
add_annotations,
pid,
exclude,
include,
@ -154,6 +212,35 @@ def auto_curate(
console.print(f'[red]Error[/red]: no token was provided (use --token or DTC_TOKEN environment variable)')
return 1
if create_change_set:
if post_change_set:
console.print(f'[red]Error[/red]: --create-change-set and --post-change-set must not be specified together')
return 1
if list_labels or list_records:
console.print(f'[red]Error[/red]: --create-change-set cannot be combined with --list-labels or --list-records')
return 1
if create_change_set.exists():
if tuple(create_change_set.glob('*')) != tuple():
console.print(f'[red]Error[/red]: {create_change_set} already exists and is not empty')
return 1
else:
if dry_run:
console.print(f'[DRY_RUN]:CREATE DIRECORY: {create_change_set}')
else:
create_change_set.mkdir()
if dry_run:
console.print(f'[DRY_RUN]:INITIALIZING GIT REPO: {create_change_set}')
else:
subprocess.run(['git', 'init', str(create_change_set)], check=1)
if post_change_set:
if list_labels or list_records:
console.print(f'[red]Error[/red]: --post-change-set cannot be combined with --list-labels or --list-records')
return 1
if destination_collection is None:
destination_collection = collection
@ -163,26 +250,32 @@ def auto_curate(
if destination_token is None:
destination_token = curator_token
session = get_session()
output = None
# If --list-labels and --list-records are provided, keep only the latter,
# because it includes listing of labels
if list_records:
if list_labels:
logger.warning('`-l/--list-labels` and `-r/--list-records` defined, ignoring `-l/--list-labels`')
console.print(f'[yellow]Warning[/yellow]: `-l/--list-labels` and `-r/--list-records` defined, ignoring `-l/--list-labels`')
list_labels = False
output = {}
if list_labels:
output = []
session = get_session()
# Get the labels from the service, or from a change set that should be posted.
if post_change_set:
all_labels = _get_change_set_labels(post_change_set)
else:
all_labels = incoming_read_labels(
service_url=service_url,
collection=collection,
token=obj,
session=session,
)
for label in all_labels:
if include and label not in include:
logger.debug('ignoring non-included incoming label: %s', label)
@ -196,10 +289,18 @@ def auto_curate(
output.append(label)
continue
if list_records:
output[label] = []
# Get the records from the service, or from a change set that should
# be posted.
if post_change_set:
record_source = _get_change_set_records(post_change_set, label)
final_record_source = track(
record_source,
description=f'processing [green]{label}[/green]',
total=len(record_source),
console=console,
)
# Get the total number of entries for the
else:
record_source = incoming_read_records(
service_url=service_url,
collection=collection,
@ -208,30 +309,98 @@ def auto_curate(
session=session,
)
# Get the first entry to find the total number of records
# Get the first entry to find the total number of records in the
# incoming area `label`.
try:
first_record, _, _, _, total = next(record_source)
except StopIteration:
console.print(f'no records in incoming area [green]{label}[/green], skipping it')
continue
# Get the first entry an all other entries
for index, (record, _, _, _, total) in track(
zip(
count(),
# Create a source that returns the first entry, all remaining entries,
# and tracks progress.
final_record_source = track(
chain(
[(first_record, None, None, None, None)],
cast(Iterable, record_source),
),
),
description=f'processing [green]{label}[/green]',
total=total,
console=console,
):
if list_records:
output[label].append(record)
)
if create_change_set:
result = _update_change_set(
source=final_record_source,
change_set=create_change_set,
label=label,
destination_service_url=destination_service_url,
destination_collection=destination_collection,
destination_token=destination_token,
pid=pid,
dry_run=dry_run,
session=session,
)
if result != 0:
return result
elif list_records:
output[label] = [record for record, _, _, _, _, in final_record_source]
continue
elif post_change_set:
result = _post_change_set(
final_record_source,
post_change_set=post_change_set,
label=label,
destination_service_url=destination_service_url,
destination_collection=destination_collection,
destination_token=destination_token,
pid=pid,
add_annotations=add_annotations,
dry_run=dry_run,
session=session,
)
if result != 0:
return result
else:
result = _curate_records(
final_record_source,
service_url=service_url,
collection=collection,
label=label,
curator_token=curator_token,
destination_service_url=destination_service_url,
destination_collection=destination_collection,
destination_token=destination_token,
pid=pid,
dry_run=dry_run,
session=session,
)
if result != 0:
return result
if output is not None:
click.echo(json.dumps(output, ensure_ascii=False))
return 0
def _curate_records(
source: Iterable,
service_url: str,
collection: str,
label: str,
curator_token: str,
destination_service_url: str,
destination_collection: str,
destination_token: str,
pid: str | None,
dry_run: bool,
session: Session,
) -> int:
for record, _, _, _, _ in source:
if pid:
if record['pid'] not in pid:
logger.debug(
@ -239,33 +408,14 @@ def auto_curate(
record['pid'])
continue
# Get the class name from the `schema_type` attribute. This requires
# that the schema type is either stored in the record or that the
# store has a "Schema Type Layer", i.e., the store type is
# `record_dir+stl`, or `sqlite+stl`.
try:
class_name = re.search('([_A-Za-z0-9]*$)', record['schema_type']).group(0)
except (IndexError, KeyError):
global stl_info
console.print(f'[yellow]Warning[/yellow]: ignoring record with pid [yellow]{record["pid"]}[/yellow] because `schema_type` attribute is missing.')
if not stl_info:
console.print(
' [yellow]Please ensure that `schema_type` is stored in the records. Note: '
'if the incoming area store has a backend with a "Schema Type Layer", i.e., '
'"record_dir+stl" or "sqlite+stl", `schema_type` will not be stored on persistent '
'storage and will not be returned when retrieving records from the incoming area. '
'dump-things-service <= 5.4.0 circumvented the "Schema Type Layer", therefore they '
'will return records without `schema_type` attributes on curator access to '
'incoming areas or curated areas. Therefore it might be a good idea to NOT use a '
'"Schema Type Layer" in collections that shall be auto-curated, when using '
'dump-things-service <= 5.4.0.[/yellow]',
)
stl_info = True
class_name = _get_class_name(record)
if class_name is None:
console.print(f'[yellow]Warning[/yellow]: could not determine class in record [yellow]{record["pid"]}[/yellow], ignoring it.')
continue
if dry_run:
console.print(f'WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to collection "{destination_collection}" on "{destination_service_url}"')
console.print(f'DELETE record [green]"{record["pid"]}"[/green] from inbox "{label}" of collection "{collection}" on "{service_url}"')
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to collection "{destination_collection}" on "{destination_service_url}"')
console.print(f'[DRY_RUN]:DELETE record [green]"{record["pid"]}"[/green] from inbox "{label}" of collection "{collection}" on "{service_url}"')
continue
# Store record in destination collection
@ -299,11 +449,215 @@ def auto_curate(
f'[red]Error[/red]: deleting record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
)
return 1
return 0
if output is not None:
click.echo(json.dumps(output, ensure_ascii=False))
def _get_class_name(record: dict) -> str | None:
"""Get the class name from the `schema_type` attribute.
This requires that the schema type is either stored in the record or that
the store has a "Schema Type Layer", i.e., the store type is
`record_dir+stl`, or `sqlite+stl`.
"""
try:
return re.search('([_A-Za-z0-9]*$)', record['schema_type']).group(0)
except (IndexError, KeyError):
global stl_info
console.print(f'[yellow]Warning[/yellow]: ignoring record with pid [yellow]{record["pid"]}[/yellow] because `schema_type` attribute is missing.')
if not stl_info:
console.print(
' [yellow]Please ensure that `schema_type` is stored in the records. Note: '
'if the incoming area store has a backend with a "Schema Type Layer", i.e., '
'"record_dir+stl" or "sqlite+stl", `schema_type` will not be stored on persistent '
'storage and will not be returned when retrieving records from the incoming area.'
'dump-things-service circumvents the "Schema Type Layer" on curator access, i.e., '
'on access to incoming areas and on access to the curated area. Therefore it '
'might return records without `schema_type` attributes on curator access to '
'incoming areas or curated areas. To be consistent, it might be a good idea to '
'NOT use a "Schema Type Layer" in collections that shall be auto-curated, but '
'store `schema_type`-attributes in all records.[/yellow]',
)
stl_info = True
return None
def _update_change_set(
source: Iterable,
change_set: Path,
label: str,
destination_service_url: str,
destination_collection: str,
destination_token: str,
pid: str | None,
dry_run: bool,
session: Session,
) -> int:
label_dir = change_set / 'records' / label
annotation_dir = change_set / 'annotations' / label
if dry_run:
console.print(f'[DRY_RUN]:CREATING LABEL DIRECTORY: {label_dir}')
else:
label_dir.mkdir(parents=True, exist_ok=True)
if dry_run:
console.print(f'[DRY_RUN]:CREATING ANNOTATION DIRECTORY: {annotation_dir}')
else:
annotation_dir.mkdir(parents=True, exist_ok=True)
for record, _, _, _, _ in source:
if pid:
if record['pid'] not in pid:
logger.debug(
'ignoring record with non-matching pid: %s',
record['pid'])
continue
class_name = _get_class_name(record)
if class_name is None:
continue
file_path = label_dir / _pid_to_filename(record['pid'])
annotation_path = annotation_dir / _pid_to_filename(record['pid'])
# Fetch the current curated record from the destination, delete its
# annotations, write it to a file, and add the file to git.
existing_record = curated_read_record_with_pid(
service_url=destination_service_url,
collection=destination_collection,
pid=record['pid'],
token=destination_token,
session=session,
)
if existing_record:
existing_record.pop('annotations', None)
if dry_run:
console.print(f'[DRY_RUN]:STORE existing record [green]"{record["pid"]}"[/green] of class "{class_name}" from curated area at "{file_path.resolve()}"')
else:
file_path.write_text(
json.dumps(existing_record, ensure_ascii=False) + '\n',
encoding='utf8'
)
else:
if dry_run:
console.print(f'[DRY_RUN]:STORE "null" at "{file_path.resolve()}"')
else:
file_path.write_text('null\n')
intra_repo_file_path = file_path.relative_to(change_set)
if dry_run:
console.print(f'[DRY_RUN]:GIT ADD {intra_repo_file_path} (cdw={change_set})')
else:
subprocess.run(['git', 'add', str(intra_repo_file_path)], cwd=change_set, check=True)
# Write the new record without annotations to disk
annotations = record.pop('annotations', None)
if dry_run:
console.print(f'[DRY_RUN]:STORE new record [green]"{record["pid"]}"[/green] of class "{class_name}" from inbox {label} at "{file_path.resolve()}"')
else:
file_path.write_text(
json.dumps(record, ensure_ascii=False) + '\n',
encoding='utf8'
)
if dry_run:
console.print(f'[DRY_RUN]:STORE annotations for record [green]"{record["pid"]}"[/green] of class "{class_name}" from curated area at "{annotation_path.resolve()}"')
else:
annotation_path.write_text(
json.dumps(annotations, ensure_ascii=False) + '\n',
encoding='utf8'
)
if dry_run:
console.print(f'[DRY_RUN]:GIT COMMIT in {change_set}')
else:
subprocess.run(['git', 'commit', '-m', 'commit curated state'], cwd=change_set, check=True)
return 0
def _pid_to_filename(pid: str) -> str:
return pid.replace('-', '--').replace('/', '-_')
def _post_change_set(
source: Iterable,
post_change_set: Path,
label: str,
destination_service_url: str,
destination_collection: str,
destination_token: str,
pid: str | None,
add_annotations: bool,
dry_run: bool,
session: Session,
):
pid_file_name = _pid_to_filename(pid) if pid else None
for file_name in source:
if pid_file_name and pid_file_name != file_name:
logger.debug('ignoring file with non-matching pid: %s', pid_file_name)
continue
try:
record_file = post_change_set / 'records' / label / file_name
record = json.loads(record_file.read_text())
except json.JSONDecodeError as jde:
console.print(f'[yellow]Warning[/yellow]: JSON error: {jde} at [yellow]{record_file}[/yellow], ignoring it.')
continue
if add_annotations:
annotation_file = post_change_set / 'annotations' / label / file_name
try:
annotations = json.loads(annotation_file.read_text())
except json.JSONDecodeError as jde:
console.print(f'[yellow]Warning[/yellow]: JSON error: {jde} at [yellow]{annotation_file}[/yellow], ignoring record.')
continue
record['annotations'] = annotations
class_name = _get_class_name(record)
if class_name is None:
console.print(f'[yellow]Warning[/yellow]: could not determine class of record [yellow]{record["pid"]}[/yellow], ignoring it.')
continue
if dry_run:
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to collection "{destination_collection}" on "{destination_service_url}"')
continue
# Store record in destination collection
try:
curated_write_record(
service_url=destination_service_url,
collection=destination_collection,
class_name=class_name,
record=record,
token=destination_token,
session=session,
)
except HTTPError as e:
console.print(
f'[red]Error[/red]: writing record with pid [green]{record["pid"]}[/green] failed: {e}: {e.response.text}',
)
return 1
return 0
subcommand_name = 'auto-curate'
def _get_change_set_labels(
change_set: Path,
) -> tuple[str]:
return tuple(
map(
lambda match: match.name,
change_set.glob(f'records/*')
)
)
def _get_change_set_records(
change_set: Path,
label: str,
) -> tuple[str]:
return tuple(
map(
lambda match: match.name,
change_set.glob(f'records/{label}/*')
)
)

View file

@ -0,0 +1,4 @@
def de_prefix(name: str) -> str:
return name.split(':', 1)[-1]

View file

@ -22,6 +22,7 @@ from ...communicate import (
incoming_read_records,
server,
)
from .common.prefix import de_prefix
subcommand_name = 'export'
@ -243,7 +244,7 @@ def _store_records(
continue
break
class_name = _de_prefix(schema_type)
class_name = de_prefix(schema_type)
if not keep_schema_type:
del record['schema_type']
@ -265,12 +266,6 @@ def _store_records(
return failed
def _de_prefix(
name: str,
):
return name.split(':', 1)[-1]
def _get_hex_digest(
data: str,
) -> str:

View file

@ -13,8 +13,11 @@ from ...communicate import (
collection_write_record,
get_session,
)
from .common.prefix import de_prefix
subcommand_name = 'post-records'
logger = logging.getLogger('post-records')
console = Console(file=sys.stderr)
@ -40,18 +43,35 @@ console = Console(file=sys.stderr)
is_flag=True,
help='store record directly in curated area instead of an inbox. (Note: requires a token with curator rights)'
)
@click.option(
'--ignore-errors',
default=False,
is_flag=True,
help='ignore errors when posting a record and continue with remaining records',
)
@click.option(
'--dry-run', '-d',
help='if provided, do not alter any data, instead print what would be done',
default=False,
is_flag=True,
)
def cli(
obj,
service_url,
collection,
cls,
curated,
ignore_errors,
dry_run,
):
"""Read records of class CLASS from standard input and store them in
the collection COLLECTION on the service SERVICE_URL. Records should be
provided in JSON-lines format. Note: all records are assumed to be of class
CLASS. To submit records of multiple classes, the subcommand has to be
invoked multiple times, once for each class.
provided in JSON-lines format. Note: if CLASS is `*` post-records will try
to infer the class from a `schema_type` property in the input records.
Otherwise, all records are
assumed to be of class CLASS. In this case, to submit records of multiple
classes, the
subcommand has to be invoked multiple times, once for each class.
If the `--curated`-option is provided, the records will be stored directly
in the curated area of the collection without any alterations, i.e, no
@ -62,6 +82,9 @@ def cli(
annotated with the submission time and the user that performed
the submission.
If any record could not be posted, error information about the failing pids
will be written to stdout in JSON Lines format (one line per faulty record).
A token is required and will be used to authenticate the requests.
If the `--curated`-option is provided, the token must have
curator-rights."""
@ -72,6 +95,8 @@ def cli(
collection,
cls,
curated,
ignore_errors,
dry_run,
)
)
@ -82,6 +107,8 @@ def post_records(
collection,
cls,
curated,
ignore_errors,
dry_run,
) -> int:
token = obj
if token is None:
@ -93,35 +120,120 @@ def post_records(
else:
write_record = collection_write_record
failed = []
session = get_session()
for index, line in zip(count(), track(sys.stdin, console=console)):
try:
record = json.loads(line)
except Exception as e:
console.print(
f'[red]Error: reading JSON record #{index} failed[/red]: {e}',
)
_handle_json_load_error(failed, index, line, e)
if ignore_errors:
continue
break
if cls == '*':
class_name = get_class_from_record(record)
if class_name is None:
_handle_schema_type_error(failed, index, record)
if ignore_errors:
continue
break
else:
class_name = cls
if dry_run:
if curated:
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to curated area of collection "{collection}" on "{service_url}"')
else:
console.print(f'[DRY_RUN]:WRITE record [green]"{record["pid"]}"[/green] of class "{class_name}" to token-associated incoming area of collection "{collection}" on "{service_url}"')
continue
try:
write_record(
service_url=service_url,
collection=collection,
class_name=cls,
class_name=class_name,
record=record,
token=token,
session=session,
)
except HTTPError as e:
console.print(
f'[red]Error: writing record #{index} with pid {record["pid"]} failed[/red]: {e}: {e.response.text}',
)
_handle_http_error(failed, index, line, e)
if ignore_errors:
continue
break
except Exception as e:
console.print(
f'[red]Error: writing record #{index} with pid {record["pid"]} failed[/red]: {e}',
)
raise
if failed:
click.echo('\n'.join(failed))
return 1
return 0
subcommand_name = 'post-records'
def _handle_json_load_error(
container: list,
index: int,
line: str,
exc: Exception,
):
console.print(f'[red]Error[/red]: error loading line [red]#{index}[/red]: {line}')
container.append(
json.dumps(
{
'status': 'json_load_error',
'index': index,
'json': line,
'detail': str(exc),
}
)
)
def _handle_http_error(
container: list,
index: int,
line: str,
exc: HTTPError,
):
console.print(f'[red]Error[/red]: error writing line [red]#{index}[/red]: {line}')
container.append(
json.dumps(
{
'status': 'http_error',
'index': index,
'json': line,
'url': exc.request.url,
'http_status': exc.response.status_code,
'http_response': exc.response.text,
}
)
)
def _handle_schema_type_error(
container: list,
index: int,
record: dict,
):
console.print(f'[red]Error[/red]: no `schema type` in record [red]#{index}[/red], pid: [red]{record["pid"]}[/red]')
container.append(
json.dumps(
{
'status': 'class_reading_error',
'pid': record['pid'],
'record': record,
'message': f'could not get class from record {record["pid"]}',
}
)
)
def get_class_from_record(record: dict) -> str | None:
if 'schema_type' in record:
return de_prefix(record['schema_type'])
return None

View file

@ -0,0 +1,9 @@
from dump_things_pyclient.tests.fixtures import (
dump_stores_simple,
dump_things_service,
)
__all__ = [
'dump_stores_simple',
'dump_things_service',
]

View file

@ -0,0 +1,123 @@
name: demo-research-information-schema
description: |
A minimal schema that is compatible with concepts.datalad.org's type
definitions and annotations.
id: https://concepts.datalad.org/s/test-schemas/dump-things-pyclient/unreleased
version: UNRELEASED
license: MIT
prefixes:
rdf:
prefix_prefix: rdf
prefix_reference: http://www.w3.org/1999/02/22-rdf-syntax-ns#
rdfs:
prefix_prefix: rdfs
prefix_reference: http://www.w3.org/2000/01/rdf-schema#
xsd:
prefix_prefix: xsd
prefix_reference: http://www.w3.org/2001/XMLSchema#
test:
prefix_prefix: test
prefix_reference: http://www.example.org/test
emit_prefixes:
- xsd
- test
default_prefix: test
types:
string:
name: string
description: A character string
from_schema: https://w3id.org/linkml/types
base: str
uri: xsd:string
uriorcurie:
name: uriorcurie
description: a URI or a CURIE
notes:
- If you are authoring schemas in LinkML YAML, the type is referenced with the
lower case "uriorcurie".
from_schema: https://w3id.org/linkml/types
base: URIorCURIE
uri: xsd:anyURI
repr: str
NodeUriOrCurie:
name: NodeUriOrCurie
typeof: uriorcurie
base: str
uri: rdfs:Resource
slots:
family_name:
name: family_name
title: Family name
range: string
given_name:
name: given_name
title: Given name
range: string
annotations:
name: annotations
description: A record of properties of the metadata record on a subject, a collection
of tag/text tuples with the semantics of OWL Annotation.
title: Annotations
range: Annotation
multivalued: true
inlined: true
annotation_tag:
name: annotation_tag
description: A tag identifying an annotation.
title: Tag
range: Thing
annotation_value:
name: annotation_value
description: The actual annotation.
title: Value
range: string
pid:
name: pid
description: Persistent and globally unique identifier of a `Thing`.
title: Persistent identifier
identifier: true
range: uriorcurie
schema_type:
name: schema_type
slot_uri: rdf:type
designates_type: true
range: NodeUriOrCurie
title:
name: title
range: string
classes:
Person:
name: Person
description: Person agents are people, alive, dead, or fictional.
is_a: Thing
slots:
- family_name
- given_name
ThingMixin:
name: ThingMixin
mixin: true
slots:
- annotations
- schema_type
Thing:
name: Thing
mixins:
- ThingMixin
slots:
- pid
Annotation:
name: Annotation
slots:
- annotation_tag
- annotation_value
slot_usage:
annotation_tag:
name: annotation_tag
key: true
range: AnnotationTag
AnnotationTag:
is_a: Thing
Document:
is_a: Thing
slots:
- title

View file

@ -0,0 +1,63 @@
from pathlib import Path
from typing import Generator
import yaml
def read_records_from_store(
store: Path,
collection: str = 'collection_1',
incoming: str | None = None,
class_name: str = '*',
remove_keys: list | None = None,
) -> Generator[tuple[Path, str | None, dict], None, None]:
"""Read record from a dumpthings-"recorddir" backend
By default records are read from the curated area. If records should
be read from the incoming area, set `incoming` to a glob expression that
matches the user IDs for which records should be read. Use '*' for all
user IDs (the value of `incoming` is used a glob-expression).
By default all classes are returned because the glob expression '*' is
used for the classes. If `class_name` is set, the value will be used as
glob expression for classes. For example, `*Document` would match
`XYZDocument` and `Document`.
If `remove_keys` is not `None`, all keys that are specified in `remove_keys`
will be removed from the records before they are returned. For example,
remove annotations by specifying `['annotations']`.
Returns tuples of (record_path, user_id or None, cleaned record content)
"""
config = yaml.safe_load((store / '.dumpthings.yaml').read_text())
curated_dir = config['collections'][collection]['curated']
incoming_dir = config['collections'][collection]['incoming']
glob_expression = f'{class_name}/**/*.yaml'
if incoming:
base_dir = store / incoming_dir
glob_expression = f'{incoming}/' + glob_expression
else:
base_dir = store / curated_dir
for record_path in base_dir.glob(glob_expression):
if record_path.name == '.dumpthings.yaml':
continue
if incoming:
base_parts = base_dir.parts
user_id = record_path.parts[len(base_parts)]
else:
user_id = None
record = yaml.safe_load(record_path.read_text())
if remove_keys:
yield (
record_path,
user_id,
{
k: v for k, v in record.items()
if k not in remove_keys
},
)
else:
yield (record_path, user_id, record)

View file

@ -0,0 +1,115 @@
import os
import signal
import socket
import subprocess
import time
from pathlib import Path
import pytest
import requests
from dump_things_service import config_file_name
# String representation of curated- and incoming-path
curated = 'curated'
incoming = 'incoming'
# Path to a local simple test schema
schema_path = Path(__file__).parent / 'assets' / 'pyclient_testschema.yaml'
schema_path = schema_path.resolve()
# The global configuration file, all collections and
# staging areas share the same directories. All tokens
# of the same collection share an "incoming_label".
global_config_text = f"""
type: collections
version: 1
collections:
collection_1:
default_token: basic_access
curated: {curated}
incoming: {incoming}
backend:
type: record_dir
auth_sources:
- type: config
submission_tags:
submitter_id_tag: https://submitter.example.com
submission_time_tag: https://time.example.com
tokens:
basic_access:
user_id: tester
collections:
collection_1:
mode: WRITE_COLLECTION
incoming_label: tester
token-curator:
user_id: test_curator
collections:
collection_1:
mode: CURATOR
incoming_label: test_curator
"""
collection_1_config = f"""type: records
version: 1
schema: {schema_path}
format: yaml
idfx: digest-md5
"""
@pytest.fixture(scope='session')
def dump_stores_simple(tmp_path_factory):
tmp_path = tmp_path_factory.mktemp('dump_store')
# Write the global config file, create collection directories, and write
# the collection config file.
(tmp_path / config_file_name).write_text(global_config_text)
(tmp_path / curated).mkdir(parents=True, exist_ok=True)
(tmp_path / incoming).mkdir(parents=True, exist_ok=True)
(tmp_path / curated / config_file_name).write_text(collection_1_config)
return tmp_path
@pytest.fixture(scope='session')
def dump_things_service(dump_stores_simple):
# Start a server on an unused port
port = _get_unused_port()
process_info = subprocess.Popen(
['dump-things-service', '--port', str(port), str(dump_stores_simple)],
)
# Wait until the server is responding to requests
waited = 1
while True:
time.sleep(1)
try:
result = requests.get(f'http://127.0.0.1:{port}/server')
if result.status_code == 200:
break
msg = f'dump-things-server responded with {result.status_code}: {result.text}'
raise ValueError(msg)
except requests.exceptions.ConnectionError:
waited += 1
if waited >= 10:
raise Exception from None
continue
# Give the port and storage location to the user
yield port, dump_stores_simple
# Shut the server down
os.kill(process_info.pid, signal.SIGTERM)
os.waitpid(process_info.pid, 0)
def _get_unused_port() -> int:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', 0))
addr = s.getsockname()
s.close()
return addr[1]

View file

@ -0,0 +1,312 @@
import json
import posix
import random
import subprocess
from itertools import chain
from click.testing import CliRunner
from dump_things_pyclient.commands.dtc import cli
from dump_things_pyclient.tests.common import read_records_from_store
from pygments.lexers import r
prefix = 'https://www.example.com/ac_e2e_test/'
def _create_unique_records(offset: int, count: int) -> dict[int, dict]:
while True:
pids = tuple(set(random.randrange(offset, offset + 10000) for _ in range(count)))
if len(pids) == count:
break
return {
# NOTE: the order of keys is relevant because there are JSON-string
# comparisons in the change-set tests below.
pid: {
'schema_type': 'test:Person',
'pid': prefix + f'person_{pid}',
'family_name': f'grieg_{pid}',
'given_name': f'erwin_{pid}',
}
for pid in pids
}
def test_auto_curate_basic_end_to_end(dump_things_service):
port, store = dump_things_service
new_records = tuple(_create_unique_records(10000, 5).values())
# Add records to inbox
runner = CliRunner()
result = runner.invoke(
cli,
['--token=basic_access', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
input='\n'.join(
json.dumps(record, ensure_ascii=False) for record in new_records
) + '\n'
)
assert result.exit_code == 0, 'dtc post-records failed'
# Ensure that the records do not yet exist in the curated area
stored_curated_records = tuple(
map(
lambda e: e[2],
read_records_from_store(store, class_name='Person', remove_keys=['annotations'])
)
)
for record in new_records:
assert record not in stored_curated_records, 'record already exists, possibly a random number collision'
# Perform auto-curation
result = runner.invoke(
cli,
['--token=token-curator', 'auto-curate', '-i', 'tester', f'http://127.0.0.1:{port}', 'collection_1'],
)
assert result.exit_code == 0, 'dtc auto-curate failed'
# Check that the inbox is empty
stored_inbox_records = tuple(
map(
lambda e: e[2],
read_records_from_store(store, incoming='tester')
)
)
assert stored_inbox_records == tuple(), 'Inbox not clean after auto-curation'
# Check that the records are in the curated area
stored_curated_records = tuple(
map(
lambda e: e[2],
read_records_from_store(store, class_name='Person', remove_keys=['annotations'])
)
)
for record in new_records:
assert record in stored_curated_records
def test_auto_curate_create_change_set_end_to_end(dump_things_service, tmp_path_factory):
port, store = dump_things_service
change_set_dir = tmp_path_factory.mktemp('create_change_set')
new_records = _create_unique_records(20000, 5)
new_curated_records = _create_unique_records(30000, 5)
# Add new curated records to inbox and auto-curate them to move them to
# the curated area.
runner = CliRunner()
result = runner.invoke(
cli,
['--token=basic_access', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
input='\n'.join(
json.dumps(record, ensure_ascii=False) for record in new_curated_records.values()
) + '\n'
)
assert result.exit_code == 0, 'dtc post-records failed'
# Perform auto-curation
result = runner.invoke(
cli,
['--token=token-curator', 'auto-curate', '-i', 'tester', f'http://127.0.0.1:{port}', 'collection_1'],
)
assert result.exit_code == 0, 'dtc auto-curate failed'
# Modify the record that were auto-curated and upload those together with
# newly created records to the inbox.
modified_curated_records = {
# NOTE: the order of keys is relevant because there are JSON-string
# comparisons in the change-set tests below.
pid: {
'schema_type': 'test:Person',
'pid': record['pid'],
'family_name': record['family_name'],
'given_name': record['given_name'].replace('erwin', 'edvard'),
}
for pid, record in new_curated_records.items()
}
# Upload the modified (already curated) records and the new records to the
# inbox.
result = runner.invoke(
cli,
['--token=basic_access', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
input='\n'.join(
json.dumps(record, ensure_ascii=False)
for record in chain(modified_curated_records.values(), new_records.values())
) + '\n'
)
assert result.exit_code == 0, 'dtc post-records failed'
# Create a change set
result = runner.invoke(
cli,
[
'--token=token-curator',
'auto-curate',
'--create-change-set', str(change_set_dir),
'-i', 'tester',
f'http://127.0.0.1:{port}', 'collection_1',
],
)
assert result.exit_code == 0, 'dtc auto-curate --create-change-set failed'
# Check the number of modified records in the change set
result = subprocess.run(
['git', 'status', '-s',],
cwd=str(change_set_dir),
check=True,
capture_output=True,
)
lines = [
line.strip()
for line in result.stdout.decode().splitlines()
if 'annotations' not in line
]
assert len(lines) == len(new_records) + len(modified_curated_records), f'unexpected number of modified records: {len(lines)}'
assert all(l.startswith('M records/tester') for l in lines), f'unexpected status for modified records: {lines}'
# Check for expected diff content
result = subprocess.run(
['git', 'diff', '-p',],
cwd=str(change_set_dir),
check=True,
capture_output=True,
)
lines = [line.strip() for line in result.stdout.decode().splitlines()]
# Every diff should be seven lines long, the line at index 5 contains the
# previous content, the line at index 6 contains the new content.
for patch in range(int(len(lines) / 7)):
old = json.loads(lines[(7 * patch) + 5][1:])
new = json.loads(lines[(7 * patch) + 6][1:])
pid = int(lines[7 * patch][-5:])
if pid in new_records:
assert old == None
assert new == new_records[pid]
else:
assert old == new_curated_records[pid]
assert new == modified_curated_records[pid]
# Check that annotations are stored in the change set
annotations = {
int(p.name[-5:]): json.loads(p.read_text())
for p in (change_set_dir / 'annotations').glob('tester/*')
}
assert len(annotations) == len(modified_curated_records) + len(new_curated_records)
def test_auto_curate_post_change_set_end_to_end(dump_things_service, tmp_path_factory):
port, store = dump_things_service
change_set_dir = tmp_path_factory.mktemp('post_change_set')
new_records = _create_unique_records(40000, 5)
# Add new records to inbox of 'tester'.
runner = CliRunner()
result = runner.invoke(
cli,
['--token=basic_access', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
input='\n'.join(
json.dumps(record, ensure_ascii=False) for record in new_records.values()
) + '\n'
)
assert result.exit_code == 0, 'dtc post-records failed'
# Create a change set with the records from 'tester's' inbox
result = runner.invoke(
cli,
[
'--token=token-curator',
'auto-curate',
'--create-change-set', str(change_set_dir),
'-i', 'tester',
f'http://127.0.0.1:{port}', 'collection_1',
],
)
assert result.exit_code == 0, 'dtc auto-curate --create-change-set failed'
# Post the changeset without annotations
result = runner.invoke(
cli,
[
'--token=token-curator',
'auto-curate',
'--post-change-set', str(change_set_dir),
f'http://127.0.0.1:{port}', 'collection_1',
],
)
assert result.exit_code == 0, 'dtc auto-curate --post-change-set failed'
# Check that new records have been posted without annotations
curated_records = tuple(
map(
lambda e: e[2],
read_records_from_store(
store,
class_name='Person',
)
)
)
for record in new_records.values():
assert record in curated_records
for record in curated_records:
try:
record_pid = int(record['pid'][-5:])
if record_pid in new_records:
assert 'annotations' not in record, f'unexpected annotations in {record}'
except ValueError:
continue
# Post the changeset with annotations
result = runner.invoke(
cli,
[
'--token=token-curator',
'auto-curate',
'--post-change-set', str(change_set_dir),
'--add-annotations',
f'http://127.0.0.1:{port}', 'collection_1',
],
)
assert result.exit_code == 0, 'dtc auto-curate --post-change-set --add-annotations failed'
# Check that new records have been posted with annotations
curated_records = tuple(
map(
lambda e: e[2],
read_records_from_store(
store,
class_name='Person',
)
)
)
# Check that all record content has been posted
cleaned_curated_records = tuple(
{
'schema_type': r['schema_type'],
'pid': r['pid'],
'family_name': r['family_name'],
'given_name': r['given_name'],
}
for r in curated_records
)
for record in new_records.values():
assert record in cleaned_curated_records
# Check that annotations were posted.
annotations = {
p.name.replace('-_', '/').replace('--', '-'): json.loads(p.read_text())
for p in change_set_dir.glob('annotations/tester/*')
}
for record in curated_records:
try:
record_pid = int(record['pid'][-5:])
if record_pid in new_records:
assert 'annotations' in record, f'missing annotations in {record}'
assert record['annotations'] == annotations[record['pid']]
except ValueError:
continue

View file

@ -0,0 +1,12 @@
import json
import yaml
from click.testing import CliRunner
from dump_things_pyclient.commands.dtc import cli
def test_dtc_version():
runner = CliRunner()
result = runner.invoke(cli, ['version'])
assert result.exit_code == 0

View file

@ -0,0 +1,199 @@
import json
import yaml
from click.testing import CliRunner
from dump_things_pyclient.commands.dtc import cli
from dump_things_pyclient.tests.common import read_records_from_store
def test_dtc_post_get_end_to_end(dump_things_service):
port, store = dump_things_service
records = tuple(
{"pid": f"test:person_{i}", "given_name": f"gustav_{i}", "family_name": f"maler_{i}"}
for i in range(5)
)
runner = CliRunner()
result = runner.invoke(
cli,
['--token=basic_access', 'post-records', f'http://127.0.0.1:{port}', 'collection_1', 'Person'],
input='\n'.join(
json.dumps(record, ensure_ascii=False) for record in records
) + '\n'
)
assert result.exit_code == 0
# Read all records from disk, check for annotations.
for disk_record in map(
lambda t: t[2],
read_records_from_store(store, incoming='tester', class_name='Person')
):
if disk_record['given_name'].startswith('gustav'):
assert 'annotations' in disk_record
# Ensure that all records from `records` are on disk
cleaned_disk_records = tuple(
map(
lambda t: t[2],
read_records_from_store(store, incoming='tester', class_name='Person', remove_keys=['annotations', 'schema_type'])
)
)
assert all(r in cleaned_disk_records for r in records)
# Check via get-records, this might return more than the records we just posted
result = runner.invoke(
cli,
[
'--token=basic_access',
'get-records', f'http://127.0.0.1:{port}', 'collection_1'
],
)
assert result.exit_code == 0
returned_records = tuple(json.loads(line) for line in result.output.splitlines())
# Due to other tests, there might be more records in the server than the
# ones that we just uploaded
assert len(returned_records) >= len(records)
# This assertion relies on the fact that the result of get-records is
# ordered by PID. This is the case because the server does order the
# results by PID (by default).
cleaned_records = tuple(
{k: v for k, v in r.items() if k not in ('annotations', 'schema_type')}
for r in returned_records
)
for r in records:
assert r in cleaned_records, f'record {r} not found in cleaned records'
def test_dtc_post_curated(dump_things_service):
port, store = dump_things_service
# We add `schema_type` to the test record because the server API code, i.e.,
# the fastapi pydantic interface classes that are generated via linkml
# add it automatically. The curated interface does never use a schema_type
# layer, so `schema_type` will end up in the persisted record. Adding it
# to our input record keeps the input and output records consistent, and
# we don't have to care about the schema_type attribute when comparing
# uploaded and downloaded records.
records = tuple(
{
"pid": f"test:person_curated_{i}",
"given_name": f"hans_{i}",
"family_name": f"post_curated_{i}",
'schema_type': 'test:Person',
}
for i in range(5)
)
runner = CliRunner()
result = runner.invoke(
cli,
[
'--token=token-curator',
'post-records', '--curated', f'http://127.0.0.1:{port}', 'collection_1', 'Person'
],
input='\n'.join(
json.dumps(record, ensure_ascii=False) for record in records
) + '\n'
)
assert result.exit_code == 0
# Check on disk, read all records from disk, check for annotations,
# delete their annotations, and ensure that all records from `records`
# are in there.
curated = store / 'curated' / 'Person'
disk_records = tuple(
yaml.load(yaml_file.read_text(), Loader=yaml.SafeLoader)
for yaml_file in curated.glob('**/*.yaml')
)
# Check that curated access does not add annotations
for record in disk_records:
if record['given_name'].startswith('hans_'):
assert 'annotations' not in record
# Check that all uploaded records appear verbatim on disk, i.e., that they
# were really uploaded through the `curated`-interface.
assert all(r in disk_records for r in records)
# Check via get-records, this might return more than the records we just posted
result = runner.invoke(
cli,
[
'--token=basic_access',
'get-records',
'--class', 'Person',
f'http://127.0.0.1:{port}', 'collection_1'
],
)
assert result.exit_code == 0
returned_records = tuple(json.loads(line) for line in result.output.splitlines())
assert len(returned_records) >= len(records)
assert all(r in returned_records for r in records)
def test_dtc_post_record_any_class(dump_things_service):
port, store = dump_things_service
person_records = tuple(
{
"pid": f"test:person_wildcard_{i}",
"given_name": f"johann_{i}",
"family_name": f"post_wildcard_class_{i}",
'schema_type': 'test:Person',
}
for i in range(3)
)
document_records = tuple(
{
"pid": f"test:document_wildcard_{i}",
"title": f"document_{i}",
'schema_type': 'test:Document',
}
for i in range(3)
)
runner = CliRunner()
result = runner.invoke(
cli,
[
'--token=basic_access',
'post-records',
f'http://127.0.0.1:{port}', 'collection_1', '*',
],
input='\n'.join(
json.dumps(record, ensure_ascii=False)
for record in person_records + document_records
) + '\n'
)
assert result.exit_code == 0
# Check that the records were associated with the correct class on disk.
# (This checks the interpretation of the schema_type attribute in the
# input records and the interface implementation).
inbox = store / 'incoming' / 'tester' / 'Person'
disk_records = tuple(
yaml.load(yaml_file.read_text(), Loader=yaml.SafeLoader)
for yaml_file in inbox.glob('**/*.yaml')
)
cleaned_disk_record = tuple(
{k: v for k, v in r.items() if k not in ('annotations',)}
for r in disk_records
)
assert all(r in cleaned_disk_record for r in person_records)
inbox = store / 'incoming' / 'tester' / 'Document'
disk_records = tuple(
yaml.load(yaml_file.read_text(), Loader=yaml.SafeLoader)
for yaml_file in inbox.glob('**/*.yaml')
)
cleaned_disk_record = tuple(
{k: v for k, v in r.items() if k not in ('annotations',)}
for r in disk_records
)
assert all(r in cleaned_disk_record for r in document_records)

View file

@ -8,10 +8,10 @@ authors = [
{name="Christian Mönch", email="christian.moench@web.de"},
]
dependencies = [
"click>=8.3.1",
"pyyaml>=6.0.3",
"requests>=2.32.5",
"rich-click>=1.9.6",
"click",
"pyyaml",
"requests",
"rich-click",
]
[project.optional-dependencies]
@ -19,6 +19,15 @@ ttl = [
"dump-things-service>=5.3.0",
]
tests = [
"dump-things-service>=5.3.0",
"pytest>=9.0.1",
]
[dependency-groups]
tests = [
"linkml>=1.10.0",
"linkml-runtime>=1.10.0",
"dump-things-service>=5.3.0",
"pytest>=9.0.1",
]

1489
uv.lock generated

File diff suppressed because it is too large Load diff