dump-things-pyclient/dump_things_pyclient/commands/dtc_plugins/export.py
2026-03-10 01:49:03 +01:00

329 lines
8.4 KiB
Python

import datetime
import hashlib
import json
import sys
from itertools import chain
from pathlib import Path
from typing import (
Any,
Generator,
)
import rich_click as click
import yaml
from rich.console import Console
from rich.progress import track
from ...communicate import (
HTTPError,
curated_read_records,
get_session,
incoming_read_labels,
incoming_read_records,
server,
)
from .common.prefix import de_prefix
subcommand_name = 'export'
console = Console(file=sys.stderr)
@click.command(short_help='Export a collection to the file system')
@click.pass_obj
@click.argument(
'service_url',
metavar='SERVICE_URL',
)
@click.argument(
'collection',
metavar='COLLECTION',
)
@click.argument(
'destination',
type=click.Path(
exists=False,
file_okay=False,
dir_okay=True,
writable=True,
allow_dash=False,
path_type=Path,
),
metavar='DESTINATION_DIR',
)
@click.option(
'--format', '-f', 'output_format',
type=click.Choice(('json', 'yaml'), case_sensitive=True),
default='json',
help='select output format for the exported records (default: json)',
)
@click.option(
'--ignore-errors',
default=False,
is_flag=True,
help='ignore records with missing `schema_type` instead of raising an error',
)
@click.option(
'--keep-schema-type', '-k',
default=False,
is_flag=True,
help='keep `schema_type`-attribute in records on file-system. By default the '
'schema_type-attribute is removed because the class is encoded in the '
'storage path of the records.'
)
@click.option(
'--json-error-messages',
default=False,
is_flag=True,
help='if this flag is given, output information about failed read or write '
'operations to stdout. The format is JSONL (JSON lines), each JSON '
'record contains the operation type (read, write), a detailed error '
'message, and additional context dependent information, e.g., the PID '
'of the record that could not be written to the file system.'
)
def cli(
obj: Any,
service_url: str,
collection: str,
destination: Path,
output_format: str,
ignore_errors: bool,
keep_schema_type: bool,
json_error_messages: bool,
):
"""Export a collection to disk
This command exports all records that are stored in curated area and in the
incoming areas of collection COLLECTION of the dump-things service
SERVICE_URL.
Exported records are written to the directory DESTINATION_DIR.
DESTINATION_DIR must not exist, `export` will create it.
A token with curator rights has to be provided.
"""
try:
sys.exit(
export(
obj,
service_url,
collection,
destination,
output_format,
ignore_errors,
keep_schema_type,
json_error_messages,
)
)
except HTTPError as e:
console.print(f'[red]Error[/red]: {e}: {e.response.text}')
except ValueError as e:
console.print(f'[red]Error[/red]: {e}')
sys.exit(1)
def export(
obj: Any,
service_url: str,
collection: str,
destination: Path,
output_format: str,
ignore_errors: bool,
keep_schema_type: bool,
json_error_messages: bool,
) -> int:
token = obj
if token is None:
console.print(f'[red]Error[/red]: no token provided')
return 1
session = get_session()
server_info = server(service_url, session=session)
collection_info = ([c for c in server_info['collections'] if c['name'] == collection] or [None])[0]
if not collection_info:
console.print(f'[red]Error[/red]: no such collection: {collection}')
return 1
description = {
'date': datetime.datetime.now().isoformat(),
'service': service_url,
'name': collection,
'schema': collection_info['schema'],
}
destination.mkdir(parents=True, exist_ok=False)
(destination / 'description.json').write_text(
json.dumps(description, indent=2, ensure_ascii=False) + '\n',
)
# Store the curated records
curated_destination = destination / 'curated'
curated_destination.mkdir()
console.print('Exporting records from curated area')
failed = _store_records(
curated_read_records(
service_url=service_url,
collection=collection,
token=token,
session=session,
),
curated_destination,
output_format,
ignore_errors,
keep_schema_type,
source_name='curated area',
)
# Store the incoming records
for label in incoming_read_labels(
service_url=service_url,
collection=collection,
token=token,
session=session,
):
console.print(f'Exporting records from incoming area: {label}')
incoming_destination = destination / 'incoming' / label
incoming_destination.mkdir(parents=True, exist_ok=False)
failed.extend(
_store_records(
incoming_read_records(
service_url=service_url,
collection=collection,
label=label,
token=token,
session=session,
),
incoming_destination,
output_format,
ignore_errors,
keep_schema_type,
source_name=f'incoming area: {label}'
)
)
if failed:
if json_error_messages:
click.echo('\n'.join(failed))
return 1
return 0
def _store_records(
source: Generator,
destination: Path,
output_format: str,
ignore_errors: bool,
keep_schema_type: bool,
source_name: str,
) -> list:
if output_format not in writer:
msg = f'unsupported output format: {output_format}'
raise ValueError(msg)
created_dirs = set()
failed = []
# Get the first result from the source to determine the total number of records.
try:
first_tuple = next(source)
except StopIteration:
return failed
total = first_tuple[4]
for record, _, _, _, _ in track(chain([first_tuple], source), total=total, console=console):
schema_type = record.get('schema_type', None)
if schema_type is None:
_handle_schema_type_error(failed, record, source_name)
if ignore_errors:
continue
break
class_name = de_prefix(schema_type)
if not keep_schema_type:
del record['schema_type']
hash_dir, hash_name = _hash_p3(record['pid'])
file_dir, file_name = (
destination / class_name / hash_dir,
hash_name,
)
if file_dir not in created_dirs:
file_dir.mkdir(parents=True, exist_ok=False)
created_dirs.add(file_dir)
writer[output_format](
file_dir=file_dir,
file_name=file_name,
record=record,
)
return failed
def _get_hex_digest(
data: str,
) -> str:
hash_context = hashlib.md5(data.encode())
return hash_context.hexdigest()
def _hash_p3(
pid: str,
) -> tuple[str, str]:
hex_digest = _get_hex_digest(pid)
return hex_digest[:3], hex_digest[3:]
def _handle_schema_type_error(
container: list,
record: dict,
source_name: str,
):
console.print(f'[red]Error[/red]: no `schema type` in record [red]{record["pid"]}[/red] in {source_name}')
container.append(
json.dumps(
{
'status': 'error',
'pid': record['pid'],
'source': source_name,
'message': f'no `schema type` in record {record["pid"]}',
}
)
)
def write_json(
file_dir: Path,
file_name: str,
record: dict,
):
(file_dir / (file_name + '.json')).write_text(
json.dumps(record, indent=2, ensure_ascii=False) + '\n',
)
def write_yaml(
file_dir: Path,
file_name: str,
record: dict,
):
(file_dir / (file_name + '.yaml')).write_text(
yaml.dump(
data=record,
sort_keys=False,
allow_unicode=True,
default_flow_style=False,
),
)
writer = {
'json': write_json,
'yaml': write_yaml,
}