Add export-subcommand to dtc #13

Merged
cmo merged 2 commits from export into master 2026-01-28 12:10:53 +00:00
2 changed files with 205 additions and 0 deletions

View file

@ -0,0 +1,188 @@
import json
from collections import defaultdict
from itertools import count
from pathlib import Path
from typing import (
Any,
Iterable,
)
import rich_click as click
from ...communicate import (
HTTPError,
curated_read_records,
incoming_read_labels,
incoming_read_records,
server,
)
subcommand_name = 'export'
@click.command(short_help='Export a collection to the file system')
@click.pass_obj
@click.argument(
'service_url',
metavar='SERVICE_URL',
)
@click.argument(
'collection',
metavar='COLLECTION',
)
@click.argument(
'destination',
type=click.Path(
exists=False,
file_okay=False,
dir_okay=True,
writable=True,
allow_dash=False,
path_type=Path,
),
metavar='DESTINATION_DIR',
)
@click.option(
'--ignore-errors',
default=False,
is_flag=True,
help='ignore records with missing `schema_type` instead of raising an error',
)
def cli(
obj: Any,
service_url: str,
collection: str,
destination: Path,
ignore_errors,
):
"""Export a collection to disk
This command exports all records that are stored in curated area and in the
incoming areas of collection COLLECTION of the dump-things service
SERVICE_URL.
Exported records are written to the directory DESTINATION_DIR.
DESTINATION_DIR must not exist, `export` will create it.
A token with curator rights has to be provided.
"""
try:
return export(
obj,
service_url,
collection,
destination,
ignore_errors,
)
except HTTPError as e:
click.echo(f'ERROR: {e}: {e.response.text}', err=True)
except ValueError as e:
click.echo(f'ERROR: {e}', err=True)
return 1
def export(
obj: Any,
service_url: str,
collection: str,
destination: Path,
ignore_errors: bool,
):
token = obj
if token is None:
click.echo(f'ERROR: no token provided', err=True)
return 1
server_info = server(service_url)
collection_info = ([c for c in server_info['collections'] if c['name'] == collection] or None)[0]
if not collection_info:
click.echo(f'ERROR: no collection {collection} on service', err=True)
return 1
description = {
'name': collection,
'schema': collection_info['schema'],
}
destination.mkdir(parents=True, exist_ok=False)
(destination / 'description.json').write_text(
json.dumps(description, ensure_ascii=False),
)
# Store the curated records
curated_destination = destination / 'curated'
curated_destination.mkdir()
_store_records(
map(
lambda x: x[0],
curated_read_records(
service_url=service_url,
collection=collection,
token=token,
)
),
curated_destination,
ignore_errors,
)
# Store the incoming records
for label in incoming_read_labels(
service_url=service_url,
collection=collection,
token=token,
):
incoming_destination = destination / 'incoming' / label
incoming_destination.mkdir(parents=True, exist_ok=False)
_store_records(
map(
lambda x: x[0],
incoming_read_records(
service_url=service_url,
collection=collection,
label=label,
token=token,
)
),
incoming_destination,
ignore_errors,
)
return 0
def _store_records(
source: Iterable,
destination: Path,
ignore_errors: bool = False,
):
created_dirs = set()
class_counters = defaultdict(count)
for record in source:
class_name = record.get('schema_type', None)
if class_name is None:
if ignore_errors:
click.echo(
f'WARNING: no `schema_type` in record `{record["pid"]}`',
err=True
)
continue
msg = f'no `schema_type` in record `{record["pid"]}`'
raise ValueError(msg)
next_name_for_class = f'{next(class_counters[class_name]):09d}.json'
file_dir, file_name = (
destination / class_name / next_name_for_class[:3],
next_name_for_class[3:]
)
if file_dir not in created_dirs:
file_dir.mkdir(parents=True, exist_ok=False)
created_dirs.add(file_dir)
(file_dir / file_name).write_text(
json.dumps(record, indent=2, ensure_ascii=False),
)

View file

@ -677,6 +677,23 @@ def incoming_delete_record(
params={'pid': pid}) params={'pid': pid})
def server(
service_url: str,
) -> JSON:
"""Get server-information from the service
:param service_url: the base URL of the service, i.e., the URL up to
`/<collection>/...` or `/server`
:return: information returned by the `<service_url>/server` endpoint
"""
url = (
(f'{service_url[:-1]}' if service_url.endswith('/') else service_url)
+ '/server'
)
return _do_request(requests.get, url=url, token=None, params=None)
def _get_from_url(url: str, def _get_from_url(url: str,
token: str | None, token: str | None,
params: dict[str, str] | None = None, params: dict[str, str] | None = None,