dump-things-pyclient/dump_things_pyclient/commands/dtc_plugins/import.py
2026-02-11 08:45:40 +01:00

334 lines
8.6 KiB
Python

import json
import sys
from functools import (
partial,
reduce,
)
from pathlib import Path
from typing import (
Any,
Callable,
)
import rich_click as click
import yaml
from rich.console import Console
from rich.progress import track
from ...communicate import (
HTTPError,
Session,
curated_write_record,
get_session,
incoming_write_record,
)
subcommand_name = 'import'
console = Console(file=sys.stderr)
@click.command(short_help='Import a collection from the file system')
@click.pass_obj
@click.argument(
'source',
type=click.Path(
exists=True,
file_okay=False,
dir_okay=True,
writable=True,
allow_dash=False,
path_type=Path,
),
metavar='SOURCE_DIR',
)
@click.option(
'--service-url', '-s',
metavar='SERVICE_URL',
default=None,
help='use the service SERVICE_URL instead of the service URL that is '
'stored in `SOURCE_DIR/description.json`',
)
@click.option(
'--collection', '-c',
metavar='COLLECTION',
default=None,
help='use the collection name COLLECTION instead of the collection name '
'that is stored in `SOURCE_DIR/description.json`',
)
@click.option(
'--ignore-errors',
default=False,
is_flag=True,
help='log errors an continue import instead of raising an exception',
)
@click.option(
'--json-error-messages',
default=False,
is_flag=True,
help='if this flag is given, output information about failed read or write '
'operations to stdout. The format is JSONL (JSON lines), each JSON '
'record contains the operation type (read, write), a detailed error '
'message, and additional context dependent information, e.g., the PID '
'of the record that could not be posted to the collection.'
)
def cli(
obj: Any,
source: Path,
service_url: str,
collection: str,
ignore_errors,
json_error_messages,
):
"""Import a collection or part of a collection from disk
This command imports all records that are stored on disk in the directory
SOURCE_DIR in the format that is created by `dtc export`. The records are
stored in the dump-things service and the collection that are recorded in
`SOURCE_DIR/description.json`.
The command is idempotent, there is no harm in executing the same command
on the same exported data twice. The command does not check for
"completeness" of an export. That allows also to import only a subset of
the exported classes by deleting those classes from the file-system that
should not be imported.
A token with curator rights has to be provided.
"""
try:
sys.exit(
import_collection(
obj,
service_url,
collection,
source,
ignore_errors,
json_error_messages,
)
)
except HTTPError as e:
console.print(f'[red]Error[/red]: {e}: {e.response.text}')
except ValueError as e:
console.print(f'[red]Error[/red]: {e}')
sys.exit(1)
def import_collection(
token: str,
service_url: str,
collection: str,
source: Path,
ignore_errors: bool,
json_error_messages: bool,
) -> int:
if token is None:
console.print(f'[red]Error[/red]: no token provided')
return 1
with (source / 'description.json').open() as description_file:
description = json.load(description_file)
session = get_session()
# Import the curated records
curated_source = source / 'curated'
failed = _load_records(
service_url=service_url or description['service_url'],
collection=collection or description['name'],
token=token,
source=curated_source,
writer=curated_write_record,
location_info='curated area',
ignore_errors=ignore_errors,
session=session,
)
if failed and not ignore_errors:
if json_error_messages:
click.echo('\n'.join(failed))
sys.exit(1)
# Import incoming areas
for incoming_source in (source / 'incoming').glob('*'):
label = incoming_source.name
console.print(f'processing incoming area {label}')
failed.extend(
_load_records(
service_url,
collection,
token,
incoming_source,
partial(incoming_write_record, label=label),
location_info=f'incoming area {label}',
ignore_errors=ignore_errors,
session=session,
)
)
if failed and not ignore_errors:
break
if failed:
if json_error_messages:
click.echo('\n'.join(failed))
return 1
return 0
def _load_records(
service_url: str,
collection: str,
token: str,
source: Path,
writer: Callable,
location_info: str,
ignore_errors: bool,
session: Session,
) -> list:
failed = []
for path in source.glob('*'):
class_name = path.parts[-1]
failed.extend(
_load_class_records(
service_url=service_url,
collection=collection,
token=token,
source=path,
writer=writer,
class_name=class_name,
location_info=location_info,
ignore_errors=ignore_errors,
session=session,
)
)
if failed and not ignore_errors:
break
return failed
def _load_class_records(
service_url: str,
collection: str,
token: str,
source: Path,
writer: Callable,
class_name: str,
location_info: str,
ignore_errors: bool,
session: Session,
) -> list:
total = reduce(
lambda s, path: s + (0 if path.is_dir() else 1),
source.glob('**/*'),
0,
)
failed = []
for path in track(
source.glob('**/*'),
description = f'processing {location_info}, class {class_name} ... ',
console=console,
total=total,
):
if path.is_dir():
continue
file_format = path.suffix[1:]
try:
with path.open('rt') as file:
record = _file_reader[file_format.lower()](file)
except Exception as e:
_process_read_error(failed, file_format, path, str(e))
if not ignore_errors:
return failed
continue
try:
writer(
service_url=service_url,
collection=collection,
class_name=class_name,
record=record,
token=token,
session=session,
)
except HTTPError as e:
_process_write_error(failed, record, class_name, collection, location_info, path, e.response.text)
if not ignore_errors:
break
except Exception as e:
_process_write_error(failed, record, class_name, collection, location_info, path, str(e))
if not ignore_errors:
break
return failed
def _json_reader(
file
):
return json.load(file)
def _yaml_reader(
file
):
return yaml.load(file, Loader=yaml.SafeLoader)
def _process_read_error(
container: list,
file_format: str,
path: Path,
message: str,
):
console.print(f'[red]Error[/red]: could not read {file_format}-file: {path}`: {message}')
container.append(
json.dumps(
{
'status': 'error',
'operation': 'read',
'format': file_format,
'path': str(path),
'details': message,
}
)
)
def _process_write_error(
container: list,
record: dict,
class_name: str,
collection: str,
location_info: str,
path: Path,
message: str,
):
pid = record['pid']
console.print(f'[red]Error[/red]: could not write record with pid [green]{pid}[/green] of class `{class_name}` (file: [yellow]{path}[/yellow]) to: "{location_info} of collection {collection}": {message}')
container.append(
json.dumps(
{
'status': 'error',
'operation': 'write',
'pid': pid,
'collection': collection,
'path': str(path),
'details': message,
}
)
)
_file_reader = {
'json': _json_reader,
'yaml': _yaml_reader,
}