329 lines
8.4 KiB
Python
329 lines
8.4 KiB
Python
import datetime
|
|
import hashlib
|
|
import json
|
|
import sys
|
|
from itertools import chain
|
|
from pathlib import Path
|
|
from typing import (
|
|
Any,
|
|
Generator,
|
|
)
|
|
|
|
import rich_click as click
|
|
import yaml
|
|
from rich.console import Console
|
|
from rich.progress import track
|
|
|
|
from ...communicate import (
|
|
HTTPError,
|
|
curated_read_records,
|
|
get_session,
|
|
incoming_read_labels,
|
|
incoming_read_records,
|
|
server,
|
|
)
|
|
from .common.prefix import de_prefix
|
|
|
|
|
|
subcommand_name = 'export'
|
|
|
|
console = Console(file=sys.stderr)
|
|
|
|
|
|
@click.command(short_help='Export a collection to the file system')
|
|
@click.pass_obj
|
|
@click.argument(
|
|
'service_url',
|
|
metavar='SERVICE_URL',
|
|
)
|
|
@click.argument(
|
|
'collection',
|
|
metavar='COLLECTION',
|
|
)
|
|
@click.argument(
|
|
'destination',
|
|
type=click.Path(
|
|
exists=False,
|
|
file_okay=False,
|
|
dir_okay=True,
|
|
writable=True,
|
|
allow_dash=False,
|
|
path_type=Path,
|
|
),
|
|
metavar='DESTINATION_DIR',
|
|
)
|
|
@click.option(
|
|
'--format', '-f', 'output_format',
|
|
type=click.Choice(('json', 'yaml'), case_sensitive=True),
|
|
default='json',
|
|
help='select output format for the exported records (default: json)',
|
|
)
|
|
@click.option(
|
|
'--ignore-errors',
|
|
default=False,
|
|
is_flag=True,
|
|
help='ignore records with missing `schema_type` instead of raising an error',
|
|
)
|
|
@click.option(
|
|
'--keep-schema-type', '-k',
|
|
default=False,
|
|
is_flag=True,
|
|
help='keep `schema_type`-attribute in records on file-system. By default the '
|
|
'schema_type-attribute is removed because the class is encoded in the '
|
|
'storage path of the records.'
|
|
)
|
|
@click.option(
|
|
'--json-error-messages',
|
|
default=False,
|
|
is_flag=True,
|
|
help='if this flag is given, output information about failed read or write '
|
|
'operations to stdout. The format is JSONL (JSON lines), each JSON '
|
|
'record contains the operation type (read, write), a detailed error '
|
|
'message, and additional context dependent information, e.g., the PID '
|
|
'of the record that could not be written to the file system.'
|
|
)
|
|
def cli(
|
|
obj: Any,
|
|
service_url: str,
|
|
collection: str,
|
|
destination: Path,
|
|
output_format: str,
|
|
ignore_errors: bool,
|
|
keep_schema_type: bool,
|
|
json_error_messages: bool,
|
|
):
|
|
"""Export a collection to disk
|
|
|
|
This command exports all records that are stored in curated area and in the
|
|
incoming areas of collection COLLECTION of the dump-things service
|
|
SERVICE_URL.
|
|
|
|
Exported records are written to the directory DESTINATION_DIR.
|
|
DESTINATION_DIR must not exist, `export` will create it.
|
|
|
|
A token with curator rights has to be provided.
|
|
"""
|
|
try:
|
|
sys.exit(
|
|
export(
|
|
obj,
|
|
service_url,
|
|
collection,
|
|
destination,
|
|
output_format,
|
|
ignore_errors,
|
|
keep_schema_type,
|
|
json_error_messages,
|
|
)
|
|
)
|
|
except HTTPError as e:
|
|
console.print(f'[red]Error[/red]: {e}: {e.response.text}')
|
|
except ValueError as e:
|
|
console.print(f'[red]Error[/red]: {e}')
|
|
sys.exit(1)
|
|
|
|
|
|
def export(
|
|
obj: Any,
|
|
service_url: str,
|
|
collection: str,
|
|
destination: Path,
|
|
output_format: str,
|
|
ignore_errors: bool,
|
|
keep_schema_type: bool,
|
|
json_error_messages: bool,
|
|
) -> int:
|
|
token = obj
|
|
|
|
if token is None:
|
|
console.print(f'[red]Error[/red]: no token provided')
|
|
return 1
|
|
|
|
session = get_session()
|
|
server_info = server(service_url, session=session)
|
|
collection_info = ([c for c in server_info['collections'] if c['name'] == collection] or [None])[0]
|
|
|
|
if not collection_info:
|
|
console.print(f'[red]Error[/red]: no such collection: {collection}')
|
|
return 1
|
|
|
|
description = {
|
|
'date': datetime.datetime.now().isoformat(),
|
|
'service': service_url,
|
|
'name': collection,
|
|
'schema': collection_info['schema'],
|
|
}
|
|
|
|
destination.mkdir(parents=True, exist_ok=False)
|
|
(destination / 'description.json').write_text(
|
|
json.dumps(description, indent=2, ensure_ascii=False) + '\n',
|
|
)
|
|
|
|
# Store the curated records
|
|
curated_destination = destination / 'curated'
|
|
curated_destination.mkdir()
|
|
|
|
console.print('Exporting records from curated area')
|
|
failed = _store_records(
|
|
curated_read_records(
|
|
service_url=service_url,
|
|
collection=collection,
|
|
token=token,
|
|
session=session,
|
|
),
|
|
curated_destination,
|
|
output_format,
|
|
ignore_errors,
|
|
keep_schema_type,
|
|
source_name='curated area',
|
|
)
|
|
|
|
# Store the incoming records
|
|
for label in incoming_read_labels(
|
|
service_url=service_url,
|
|
collection=collection,
|
|
token=token,
|
|
session=session,
|
|
):
|
|
console.print(f'Exporting records from incoming area: {label}')
|
|
incoming_destination = destination / 'incoming' / label
|
|
incoming_destination.mkdir(parents=True, exist_ok=False)
|
|
failed.extend(
|
|
_store_records(
|
|
incoming_read_records(
|
|
service_url=service_url,
|
|
collection=collection,
|
|
label=label,
|
|
token=token,
|
|
session=session,
|
|
),
|
|
incoming_destination,
|
|
output_format,
|
|
ignore_errors,
|
|
keep_schema_type,
|
|
source_name=f'incoming area: {label}'
|
|
)
|
|
)
|
|
|
|
if failed:
|
|
if json_error_messages:
|
|
click.echo('\n'.join(failed))
|
|
return 1
|
|
|
|
return 0
|
|
|
|
|
|
def _store_records(
|
|
source: Generator,
|
|
destination: Path,
|
|
output_format: str,
|
|
ignore_errors: bool,
|
|
keep_schema_type: bool,
|
|
source_name: str,
|
|
) -> list:
|
|
|
|
if output_format not in writer:
|
|
msg = f'unsupported output format: {output_format}'
|
|
raise ValueError(msg)
|
|
|
|
created_dirs = set()
|
|
failed = []
|
|
|
|
# Get the first result from the source to determine the total number of records.
|
|
try:
|
|
first_tuple = next(source)
|
|
except StopIteration:
|
|
return failed
|
|
|
|
total = first_tuple[4]
|
|
for record, _, _, _, _ in track(chain([first_tuple], source), total=total, console=console):
|
|
schema_type = record.get('schema_type', None)
|
|
if schema_type is None:
|
|
_handle_schema_type_error(failed, record, source_name)
|
|
if ignore_errors:
|
|
continue
|
|
break
|
|
|
|
class_name = de_prefix(schema_type)
|
|
if not keep_schema_type:
|
|
del record['schema_type']
|
|
|
|
hash_dir, hash_name = _hash_p3(record['pid'])
|
|
file_dir, file_name = (
|
|
destination / class_name / hash_dir,
|
|
hash_name,
|
|
)
|
|
if file_dir not in created_dirs:
|
|
file_dir.mkdir(parents=True, exist_ok=False)
|
|
created_dirs.add(file_dir)
|
|
|
|
writer[output_format](
|
|
file_dir=file_dir,
|
|
file_name=file_name,
|
|
record=record,
|
|
)
|
|
|
|
return failed
|
|
|
|
|
|
def _get_hex_digest(
|
|
data: str,
|
|
) -> str:
|
|
hash_context = hashlib.md5(data.encode())
|
|
return hash_context.hexdigest()
|
|
|
|
|
|
def _hash_p3(
|
|
pid: str,
|
|
) -> tuple[str, str]:
|
|
hex_digest = _get_hex_digest(pid)
|
|
return hex_digest[:3], hex_digest[3:]
|
|
|
|
|
|
def _handle_schema_type_error(
|
|
container: list,
|
|
record: dict,
|
|
source_name: str,
|
|
):
|
|
console.print(f'[red]Error[/red]: no `schema type` in record [red]{record["pid"]}[/red] in {source_name}')
|
|
container.append(
|
|
json.dumps(
|
|
{
|
|
'status': 'error',
|
|
'pid': record['pid'],
|
|
'source': source_name,
|
|
'message': f'no `schema type` in record {record["pid"]}',
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
def write_json(
|
|
file_dir: Path,
|
|
file_name: str,
|
|
record: dict,
|
|
):
|
|
(file_dir / (file_name + '.json')).write_text(
|
|
json.dumps(record, indent=2, ensure_ascii=False) + '\n',
|
|
)
|
|
|
|
|
|
def write_yaml(
|
|
file_dir: Path,
|
|
file_name: str,
|
|
record: dict,
|
|
):
|
|
(file_dir / (file_name + '.yaml')).write_text(
|
|
yaml.dump(
|
|
data=record,
|
|
sort_keys=False,
|
|
allow_unicode=True,
|
|
default_flow_style=False,
|
|
),
|
|
)
|
|
|
|
|
|
writer = {
|
|
'json': write_json,
|
|
'yaml': write_yaml,
|
|
}
|