Add dtc subcommand import #30

Merged
cmo merged 4 commits from import into master 2026-02-09 14:13:17 +00:00
3 changed files with 264 additions and 9 deletions

View file

@ -1,11 +1,8 @@
import datetime
import hashlib
import json
import sys
from collections import defaultdict
from itertools import (
chain,
count,
)
from itertools import chain
from pathlib import Path
from typing import (
Any,
@ -106,7 +103,7 @@ def cli(
)
except HTTPError as e:
console.print(f'[red]Error[/red]: {e}: {e.response.text}')
except ValueError as e:
except Exception as e:
console.print(f'[red]Error[/red]: {e}')
return 1
@ -135,13 +132,15 @@ def export(
return 1
description = {
'date': datetime.datetime.now().isoformat(),
'service': service_url,
'name': collection,
'schema': collection_info['schema'],
}
destination.mkdir(parents=True, exist_ok=False)
(destination / 'description.json').write_text(
json.dumps(description, ensure_ascii=False),
json.dumps(description, indent=2, ensure_ascii=False) + '\n',
)
# Store the curated records
@ -200,14 +199,12 @@ def _store_records(
source_name: str,
):
created_dirs = set()
class_counters = defaultdict(count)
# Get the first result from the source to determine the total number
# of records.
try:
first_tuple = next(source)
except StopIteration:
console.print(f'no records in incoming [green]{source_name}[/green], skipping it')
return
total = first_tuple[4]

View file

@ -0,0 +1,257 @@
import json
import sys
from functools import (
partial,
reduce,
)
from pathlib import Path
from typing import (
Any,
Callable,
)
import rich_click as click
import yaml
from rich.console import Console
from rich.progress import track
from ...communicate import (
HTTPError,
Session,
curated_write_record,
get_session,
incoming_write_record,
)
subcommand_name = 'import'
console = Console(file=sys.stderr)
@click.command(short_help='Import a collection from the file system')
@click.pass_obj
@click.argument(
'source',
type=click.Path(
exists=True,
file_okay=False,
dir_okay=True,
writable=True,
allow_dash=False,
path_type=Path,
),
metavar='SOURCE_DIR',
)
@click.option(
'--service-url', '-s',
metavar='SERVICE_URL',
default=None,
help='use the service SERVICE_URL instead of the service URL that is '
'stored in `SOURCE_DIR/description.json`',
)
@click.option(
'--collection', '-c',
metavar='COLLECTION',
default=None,
help='use the collection name COLLECTION instead of the collection name '
'that is stored in `SOURCE_DIR/description.json`',
)
@click.option(
'--ignore-errors',
default=False,
is_flag=True,
help='log errors an continue import instead of raising an exception',
)
def cli(
obj: Any,
source: Path,
service_url: str,
collection: str,
ignore_errors,
):
"""Import a collection from disk
This command imports all records that are stored on disk in the directory
SOURCE_DIR in the format that is created by `dtc export`. The records are
stored in the dump-things service and the collection that are recorded in
`SOURCE_DIR/description.json`.
A token with curator rights has to be provided.
"""
try:
return import_collection(
obj,
service_url,
collection,
source,
ignore_errors,
)
except HTTPError as e:
click.echo(f'ERROR: {e}: {e.response.text}', err=True)
except ValueError as e:
click.echo(f'ERROR: {e}', err=True)
return 1
def import_collection(
token: str,
service_url: str,
collection: str,
source: Path,
ignore_errors: bool,
):
if token is None:
click.echo(f'ERROR: no token provided', err=True)
return 1
with (source / 'description.json').open() as description_file:
description = json.load(description_file)
session = get_session()
# Import the curated records
curated_source = source / 'curated'
result = _load_records(
service_url=service_url or description['service_url'],
collection=collection or description['name'],
token=token,
source=curated_source,
writer=curated_write_record,
location_info='curated area',
ignore_errors=ignore_errors,
session=session,
)
if result != 0 and not ignore_errors:
return result
# Import incoming areas
errors = 0
for incoming_source in (source / 'incoming').glob('*'):
label = incoming_source.name
console.print(f'processing incoming area {label}')
result = _load_records(
service_url,
collection,
token,
incoming_source,
partial(incoming_write_record, label=label),
location_info=f'incoming area {label}',
session=session,
ignore_errors=ignore_errors,
)
if result != 0:
if ignore_errors:
errors += 1
else:
return result
return int(errors > 0)
def _load_records(
service_url: str,
collection: str,
token: str,
source: Path,
writer: Callable,
location_info: str,
session: Session,
ignore_errors: bool = False,
) -> int:
errors = 0
for path in source.glob('*'):
class_name = path.parts[-1]
result = _load_class_records(
service_url=service_url,
collection=collection,
token=token,
source=path,
writer=writer,
class_name=class_name,
location_info=location_info,
session=session,
ignore_errors=ignore_errors,
)
if result != 0:
if ignore_errors:
errors += 1
else:
return result
return int(errors > 0)
def _load_class_records(
service_url: str,
collection: str,
token: str,
source: Path,
writer: Callable,
class_name: str,
location_info: str,
session: Session,
ignore_errors: bool = False,
) -> int:
total = reduce(
lambda s, path: s + (0 if path.is_dir() else 1),
source.glob('**/*'),
0,
)
for path in track(
source.glob('**/*'),
description = f'processing {location_info}, class {class_name} ... ',
console=console,
total=total,
):
if path.is_dir():
continue
file_format = path.suffix[1:]
try:
with path.open('rt') as file:
record = _file_reader[file_format.lower()](file)
except Exception as e:
console.print(f'[red]Error[/red]: could not read {file_format}-file: {path}`: {e}')
if not ignore_errors:
return 1
continue
try:
writer(
service_url=service_url,
collection=collection,
class_name=class_name,
record=record,
token=token,
session=session,
)
except HTTPError as e:
console.print(f'[red]Error[/red]: could not write record with pid [green]{record["pid"]}[/green] of class `{class_name}` (file: [yellow]{path}[/yellow]) to: "{location_info}": {e} {e.response.text}')
if not ignore_errors:
return 1
except Exception as e:
console.print(f'[red]Error[/red]: could not write record with pid [green]{record["pid"]}[/green] of class `{class_name}` (file: [yellow]{path}[/yellow]) to: "{location_info}": {e}')
if not ignore_errors:
return 1
return 0
def _json_reader(
file
):
return json.load(file)
def _yaml_reader(
file
):
return yaml.load(file, Loader=yaml.SafeLoader)
_file_reader = {
'json': _json_reader,
'yaml': _yaml_reader,
}

View file

@ -41,6 +41,7 @@ __all__ = [
'incoming_write_record',
'maintenance',
'server',
'Session',
]