Add export-subcommand to dtc #13
2 changed files with 205 additions and 0 deletions
188
dump_things_pyclient/commands/dtc_plugins/export.py
Normal file
188
dump_things_pyclient/commands/dtc_plugins/export.py
Normal file
|
|
@ -0,0 +1,188 @@
|
|||
import json
|
||||
from collections import defaultdict
|
||||
from itertools import count
|
||||
from pathlib import Path
|
||||
from typing import (
|
||||
Any,
|
||||
Iterable,
|
||||
)
|
||||
|
||||
import rich_click as click
|
||||
|
||||
from ...communicate import (
|
||||
HTTPError,
|
||||
curated_read_records,
|
||||
incoming_read_labels,
|
||||
incoming_read_records,
|
||||
server,
|
||||
)
|
||||
|
||||
|
||||
subcommand_name = 'export'
|
||||
|
||||
|
||||
@click.command(short_help='Export a collection to the file system')
|
||||
@click.pass_obj
|
||||
@click.argument(
|
||||
'service_url',
|
||||
metavar='SERVICE_URL',
|
||||
)
|
||||
@click.argument(
|
||||
'collection',
|
||||
metavar='COLLECTION',
|
||||
)
|
||||
@click.argument(
|
||||
'destination',
|
||||
type=click.Path(
|
||||
exists=False,
|
||||
file_okay=False,
|
||||
dir_okay=True,
|
||||
writable=True,
|
||||
allow_dash=False,
|
||||
path_type=Path,
|
||||
),
|
||||
metavar='DESTINATION_DIR',
|
||||
)
|
||||
@click.option(
|
||||
'--ignore-errors',
|
||||
default=False,
|
||||
is_flag=True,
|
||||
help='ignore records with missing `schema_type` instead of raising an error',
|
||||
)
|
||||
def cli(
|
||||
obj: Any,
|
||||
service_url: str,
|
||||
collection: str,
|
||||
destination: Path,
|
||||
ignore_errors,
|
||||
):
|
||||
"""Export a collection to disk
|
||||
|
||||
This command exports all records that are stored in curated area and in the
|
||||
incoming areas of collection COLLECTION of the dump-things service
|
||||
SERVICE_URL.
|
||||
|
||||
Exported records are written to the directory DESTINATION_DIR.
|
||||
DESTINATION_DIR must not exist, `export` will create it.
|
||||
|
||||
A token with curator rights has to be provided.
|
||||
"""
|
||||
try:
|
||||
return export(
|
||||
obj,
|
||||
service_url,
|
||||
collection,
|
||||
destination,
|
||||
ignore_errors,
|
||||
)
|
||||
except HTTPError as e:
|
||||
click.echo(f'ERROR: {e}: {e.response.text}', err=True)
|
||||
except ValueError as e:
|
||||
click.echo(f'ERROR: {e}', err=True)
|
||||
return 1
|
||||
|
||||
|
||||
def export(
|
||||
obj: Any,
|
||||
service_url: str,
|
||||
collection: str,
|
||||
destination: Path,
|
||||
ignore_errors: bool,
|
||||
):
|
||||
token = obj
|
||||
|
||||
if token is None:
|
||||
click.echo(f'ERROR: no token provided', err=True)
|
||||
return 1
|
||||
|
||||
server_info = server(service_url)
|
||||
collection_info = ([c for c in server_info['collections'] if c['name'] == collection] or None)[0]
|
||||
|
||||
if not collection_info:
|
||||
click.echo(f'ERROR: no collection {collection} on service', err=True)
|
||||
return 1
|
||||
|
||||
description = {
|
||||
'name': collection,
|
||||
'schema': collection_info['schema'],
|
||||
}
|
||||
|
||||
destination.mkdir(parents=True, exist_ok=False)
|
||||
(destination / 'description.json').write_text(
|
||||
json.dumps(description, ensure_ascii=False),
|
||||
)
|
||||
|
||||
# Store the curated records
|
||||
curated_destination = destination / 'curated'
|
||||
curated_destination.mkdir()
|
||||
|
||||
_store_records(
|
||||
map(
|
||||
lambda x: x[0],
|
||||
curated_read_records(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
token=token,
|
||||
)
|
||||
),
|
||||
curated_destination,
|
||||
ignore_errors,
|
||||
)
|
||||
|
||||
# Store the incoming records
|
||||
for label in incoming_read_labels(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
token=token,
|
||||
):
|
||||
incoming_destination = destination / 'incoming' / label
|
||||
incoming_destination.mkdir(parents=True, exist_ok=False)
|
||||
_store_records(
|
||||
map(
|
||||
lambda x: x[0],
|
||||
incoming_read_records(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
label=label,
|
||||
token=token,
|
||||
)
|
||||
),
|
||||
incoming_destination,
|
||||
ignore_errors,
|
||||
)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def _store_records(
|
||||
source: Iterable,
|
||||
destination: Path,
|
||||
ignore_errors: bool = False,
|
||||
):
|
||||
created_dirs = set()
|
||||
class_counters = defaultdict(count)
|
||||
|
||||
for record in source:
|
||||
class_name = record.get('schema_type', None)
|
||||
if class_name is None:
|
||||
if ignore_errors:
|
||||
click.echo(
|
||||
f'WARNING: no `schema_type` in record `{record["pid"]}`',
|
||||
err=True
|
||||
)
|
||||
continue
|
||||
msg = f'no `schema_type` in record `{record["pid"]}`'
|
||||
raise ValueError(msg)
|
||||
|
||||
next_name_for_class = f'{next(class_counters[class_name]):09d}.json'
|
||||
file_dir, file_name = (
|
||||
destination / class_name / next_name_for_class[:3],
|
||||
next_name_for_class[3:]
|
||||
)
|
||||
if file_dir not in created_dirs:
|
||||
file_dir.mkdir(parents=True, exist_ok=False)
|
||||
created_dirs.add(file_dir)
|
||||
|
||||
(file_dir / file_name).write_text(
|
||||
json.dumps(record, indent=2, ensure_ascii=False),
|
||||
)
|
||||
|
|
@ -677,6 +677,23 @@ def incoming_delete_record(
|
|||
params={'pid': pid})
|
||||
|
||||
|
||||
def server(
|
||||
service_url: str,
|
||||
) -> JSON:
|
||||
"""Get server-information from the service
|
||||
|
||||
:param service_url: the base URL of the service, i.e., the URL up to
|
||||
`/<collection>/...` or `/server`
|
||||
|
||||
:return: information returned by the `<service_url>/server` endpoint
|
||||
"""
|
||||
url = (
|
||||
(f'{service_url[:-1]}' if service_url.endswith('/') else service_url)
|
||||
+ '/server'
|
||||
)
|
||||
return _do_request(requests.get, url=url, token=None, params=None)
|
||||
|
||||
|
||||
def _get_from_url(url: str,
|
||||
token: str | None,
|
||||
params: dict[str, str] | None = None,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue