334 lines
8.6 KiB
Python
334 lines
8.6 KiB
Python
import json
|
|
import sys
|
|
from functools import (
|
|
partial,
|
|
reduce,
|
|
)
|
|
from pathlib import Path
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
)
|
|
|
|
import rich_click as click
|
|
import yaml
|
|
from rich.console import Console
|
|
from rich.progress import track
|
|
|
|
from ...communicate import (
|
|
HTTPError,
|
|
Session,
|
|
curated_write_record,
|
|
get_session,
|
|
incoming_write_record,
|
|
)
|
|
|
|
subcommand_name = 'import'
|
|
|
|
console = Console(file=sys.stderr)
|
|
|
|
|
|
@click.command(short_help='Import a collection from the file system')
|
|
@click.pass_obj
|
|
@click.argument(
|
|
'source',
|
|
type=click.Path(
|
|
exists=True,
|
|
file_okay=False,
|
|
dir_okay=True,
|
|
writable=True,
|
|
allow_dash=False,
|
|
path_type=Path,
|
|
),
|
|
metavar='SOURCE_DIR',
|
|
)
|
|
@click.option(
|
|
'--service-url', '-s',
|
|
metavar='SERVICE_URL',
|
|
default=None,
|
|
help='use the service SERVICE_URL instead of the service URL that is '
|
|
'stored in `SOURCE_DIR/description.json`',
|
|
)
|
|
@click.option(
|
|
'--collection', '-c',
|
|
metavar='COLLECTION',
|
|
default=None,
|
|
help='use the collection name COLLECTION instead of the collection name '
|
|
'that is stored in `SOURCE_DIR/description.json`',
|
|
)
|
|
@click.option(
|
|
'--ignore-errors',
|
|
default=False,
|
|
is_flag=True,
|
|
help='log errors an continue import instead of raising an exception',
|
|
)
|
|
@click.option(
|
|
'--json-error-messages',
|
|
default=False,
|
|
is_flag=True,
|
|
help='if this flag is given, output information about failed read or write '
|
|
'operations to stdout. The format is JSONL (JSON lines), each JSON '
|
|
'record contains the operation type (read, write), a detailed error '
|
|
'message, and additional context dependent information, e.g., the PID '
|
|
'of the record that could not be posted to the collection.'
|
|
)
|
|
def cli(
|
|
obj: Any,
|
|
source: Path,
|
|
service_url: str,
|
|
collection: str,
|
|
ignore_errors,
|
|
json_error_messages,
|
|
):
|
|
"""Import a collection or part of a collection from disk
|
|
|
|
This command imports all records that are stored on disk in the directory
|
|
SOURCE_DIR in the format that is created by `dtc export`. The records are
|
|
stored in the dump-things service and the collection that are recorded in
|
|
`SOURCE_DIR/description.json`.
|
|
|
|
The command is idempotent, there is no harm in executing the same command
|
|
on the same exported data twice. The command does not check for
|
|
"completeness" of an export. That allows also to import only a subset of
|
|
the exported classes by deleting those classes from the file-system that
|
|
should not be imported.
|
|
|
|
A token with curator rights has to be provided.
|
|
"""
|
|
try:
|
|
sys.exit(
|
|
import_collection(
|
|
obj,
|
|
service_url,
|
|
collection,
|
|
source,
|
|
ignore_errors,
|
|
json_error_messages,
|
|
)
|
|
)
|
|
except HTTPError as e:
|
|
console.print(f'[red]Error[/red]: {e}: {e.response.text}')
|
|
except ValueError as e:
|
|
console.print(f'[red]Error[/red]: {e}')
|
|
sys.exit(1)
|
|
|
|
|
|
def import_collection(
|
|
token: str,
|
|
service_url: str,
|
|
collection: str,
|
|
source: Path,
|
|
ignore_errors: bool,
|
|
json_error_messages: bool,
|
|
) -> int:
|
|
|
|
if token is None:
|
|
console.print(f'[red]Error[/red]: no token provided')
|
|
return 1
|
|
|
|
with (source / 'description.json').open() as description_file:
|
|
description = json.load(description_file)
|
|
|
|
session = get_session()
|
|
|
|
# Import the curated records
|
|
curated_source = source / 'curated'
|
|
failed = _load_records(
|
|
service_url=service_url or description['service_url'],
|
|
collection=collection or description['name'],
|
|
token=token,
|
|
source=curated_source,
|
|
writer=curated_write_record,
|
|
location_info='curated area',
|
|
ignore_errors=ignore_errors,
|
|
session=session,
|
|
)
|
|
if failed and not ignore_errors:
|
|
if json_error_messages:
|
|
click.echo('\n'.join(failed))
|
|
sys.exit(1)
|
|
|
|
# Import incoming areas
|
|
for incoming_source in (source / 'incoming').glob('*'):
|
|
label = incoming_source.name
|
|
console.print(f'processing incoming area {label}')
|
|
|
|
failed.extend(
|
|
_load_records(
|
|
service_url,
|
|
collection,
|
|
token,
|
|
incoming_source,
|
|
partial(incoming_write_record, label=label),
|
|
location_info=f'incoming area {label}',
|
|
ignore_errors=ignore_errors,
|
|
session=session,
|
|
)
|
|
)
|
|
|
|
if failed and not ignore_errors:
|
|
break
|
|
|
|
if failed:
|
|
if json_error_messages:
|
|
click.echo('\n'.join(failed))
|
|
return 1
|
|
|
|
return 0
|
|
|
|
|
|
def _load_records(
|
|
service_url: str,
|
|
collection: str,
|
|
token: str,
|
|
source: Path,
|
|
writer: Callable,
|
|
location_info: str,
|
|
ignore_errors: bool,
|
|
session: Session,
|
|
) -> list:
|
|
|
|
failed = []
|
|
for path in source.glob('*'):
|
|
class_name = path.parts[-1]
|
|
failed.extend(
|
|
_load_class_records(
|
|
service_url=service_url,
|
|
collection=collection,
|
|
token=token,
|
|
source=path,
|
|
writer=writer,
|
|
class_name=class_name,
|
|
location_info=location_info,
|
|
ignore_errors=ignore_errors,
|
|
session=session,
|
|
)
|
|
)
|
|
if failed and not ignore_errors:
|
|
break
|
|
|
|
return failed
|
|
|
|
|
|
def _load_class_records(
|
|
service_url: str,
|
|
collection: str,
|
|
token: str,
|
|
source: Path,
|
|
writer: Callable,
|
|
class_name: str,
|
|
location_info: str,
|
|
ignore_errors: bool,
|
|
session: Session,
|
|
) -> list:
|
|
|
|
total = reduce(
|
|
lambda s, path: s + (0 if path.is_dir() else 1),
|
|
source.glob('**/*'),
|
|
0,
|
|
)
|
|
|
|
failed = []
|
|
for path in track(
|
|
source.glob('**/*'),
|
|
description = f'processing {location_info}, class {class_name} ... ',
|
|
console=console,
|
|
total=total,
|
|
):
|
|
if path.is_dir():
|
|
continue
|
|
|
|
file_format = path.suffix[1:]
|
|
try:
|
|
with path.open('rt') as file:
|
|
record = _file_reader[file_format.lower()](file)
|
|
except Exception as e:
|
|
_process_read_error(failed, file_format, path, str(e))
|
|
if not ignore_errors:
|
|
return failed
|
|
continue
|
|
|
|
try:
|
|
writer(
|
|
service_url=service_url,
|
|
collection=collection,
|
|
class_name=class_name,
|
|
record=record,
|
|
token=token,
|
|
session=session,
|
|
)
|
|
except HTTPError as e:
|
|
_process_write_error(failed, record, class_name, collection, location_info, path, e.response.text)
|
|
if not ignore_errors:
|
|
break
|
|
except Exception as e:
|
|
_process_write_error(failed, record, class_name, collection, location_info, path, str(e))
|
|
if not ignore_errors:
|
|
break
|
|
|
|
return failed
|
|
|
|
|
|
def _json_reader(
|
|
file
|
|
):
|
|
return json.load(file)
|
|
|
|
|
|
def _yaml_reader(
|
|
file
|
|
):
|
|
return yaml.load(file, Loader=yaml.SafeLoader)
|
|
|
|
|
|
def _process_read_error(
|
|
container: list,
|
|
file_format: str,
|
|
path: Path,
|
|
message: str,
|
|
):
|
|
|
|
console.print(f'[red]Error[/red]: could not read {file_format}-file: {path}`: {message}')
|
|
container.append(
|
|
json.dumps(
|
|
{
|
|
'status': 'error',
|
|
'operation': 'read',
|
|
'format': file_format,
|
|
'path': str(path),
|
|
'details': message,
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
def _process_write_error(
|
|
container: list,
|
|
record: dict,
|
|
class_name: str,
|
|
collection: str,
|
|
location_info: str,
|
|
path: Path,
|
|
message: str,
|
|
):
|
|
|
|
pid = record['pid']
|
|
console.print(f'[red]Error[/red]: could not write record with pid [green]{pid}[/green] of class `{class_name}` (file: [yellow]{path}[/yellow]) to: "{location_info} of collection {collection}": {message}')
|
|
container.append(
|
|
json.dumps(
|
|
{
|
|
'status': 'error',
|
|
'operation': 'write',
|
|
'pid': pid,
|
|
'collection': collection,
|
|
'path': str(path),
|
|
'details': message,
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
_file_reader = {
|
|
'json': _json_reader,
|
|
'yaml': _yaml_reader,
|
|
}
|