Add dtc subcommand import #30
3 changed files with 264 additions and 9 deletions
|
|
@ -1,11 +1,8 @@
|
|||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from itertools import (
|
||||
chain,
|
||||
count,
|
||||
)
|
||||
from itertools import chain
|
||||
from pathlib import Path
|
||||
from typing import (
|
||||
Any,
|
||||
|
|
@ -106,7 +103,7 @@ def cli(
|
|||
)
|
||||
except HTTPError as e:
|
||||
console.print(f'[red]Error[/red]: {e}: {e.response.text}')
|
||||
except ValueError as e:
|
||||
except Exception as e:
|
||||
console.print(f'[red]Error[/red]: {e}')
|
||||
return 1
|
||||
|
||||
|
|
@ -135,13 +132,15 @@ def export(
|
|||
return 1
|
||||
|
||||
description = {
|
||||
'date': datetime.datetime.now().isoformat(),
|
||||
'service': service_url,
|
||||
'name': collection,
|
||||
'schema': collection_info['schema'],
|
||||
}
|
||||
|
||||
destination.mkdir(parents=True, exist_ok=False)
|
||||
(destination / 'description.json').write_text(
|
||||
json.dumps(description, ensure_ascii=False),
|
||||
json.dumps(description, indent=2, ensure_ascii=False) + '\n',
|
||||
)
|
||||
|
||||
# Store the curated records
|
||||
|
|
@ -200,14 +199,12 @@ def _store_records(
|
|||
source_name: str,
|
||||
):
|
||||
created_dirs = set()
|
||||
class_counters = defaultdict(count)
|
||||
|
||||
# Get the first result from the source to determine the total number
|
||||
# of records.
|
||||
try:
|
||||
first_tuple = next(source)
|
||||
except StopIteration:
|
||||
console.print(f'no records in incoming [green]{source_name}[/green], skipping it')
|
||||
return
|
||||
|
||||
total = first_tuple[4]
|
||||
|
|
|
|||
257
dump_things_pyclient/commands/dtc_plugins/import.py
Normal file
257
dump_things_pyclient/commands/dtc_plugins/import.py
Normal file
|
|
@ -0,0 +1,257 @@
|
|||
import json
|
||||
import sys
|
||||
from functools import (
|
||||
partial,
|
||||
reduce,
|
||||
)
|
||||
from pathlib import Path
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
)
|
||||
|
||||
import rich_click as click
|
||||
import yaml
|
||||
from rich.console import Console
|
||||
from rich.progress import track
|
||||
|
||||
from ...communicate import (
|
||||
HTTPError,
|
||||
Session,
|
||||
curated_write_record,
|
||||
get_session,
|
||||
incoming_write_record,
|
||||
)
|
||||
|
||||
subcommand_name = 'import'
|
||||
|
||||
console = Console(file=sys.stderr)
|
||||
|
||||
|
||||
@click.command(short_help='Import a collection from the file system')
|
||||
@click.pass_obj
|
||||
@click.argument(
|
||||
'source',
|
||||
type=click.Path(
|
||||
exists=True,
|
||||
file_okay=False,
|
||||
dir_okay=True,
|
||||
writable=True,
|
||||
allow_dash=False,
|
||||
path_type=Path,
|
||||
),
|
||||
metavar='SOURCE_DIR',
|
||||
)
|
||||
@click.option(
|
||||
'--service-url', '-s',
|
||||
metavar='SERVICE_URL',
|
||||
default=None,
|
||||
help='use the service SERVICE_URL instead of the service URL that is '
|
||||
'stored in `SOURCE_DIR/description.json`',
|
||||
)
|
||||
@click.option(
|
||||
'--collection', '-c',
|
||||
metavar='COLLECTION',
|
||||
default=None,
|
||||
help='use the collection name COLLECTION instead of the collection name '
|
||||
'that is stored in `SOURCE_DIR/description.json`',
|
||||
)
|
||||
@click.option(
|
||||
'--ignore-errors',
|
||||
default=False,
|
||||
is_flag=True,
|
||||
help='log errors an continue import instead of raising an exception',
|
||||
)
|
||||
def cli(
|
||||
obj: Any,
|
||||
source: Path,
|
||||
service_url: str,
|
||||
collection: str,
|
||||
ignore_errors,
|
||||
):
|
||||
"""Import a collection from disk
|
||||
|
||||
This command imports all records that are stored on disk in the directory
|
||||
SOURCE_DIR in the format that is created by `dtc export`. The records are
|
||||
stored in the dump-things service and the collection that are recorded in
|
||||
`SOURCE_DIR/description.json`.
|
||||
|
||||
A token with curator rights has to be provided.
|
||||
"""
|
||||
try:
|
||||
return import_collection(
|
||||
obj,
|
||||
service_url,
|
||||
collection,
|
||||
source,
|
||||
ignore_errors,
|
||||
)
|
||||
except HTTPError as e:
|
||||
click.echo(f'ERROR: {e}: {e.response.text}', err=True)
|
||||
except ValueError as e:
|
||||
click.echo(f'ERROR: {e}', err=True)
|
||||
return 1
|
||||
|
||||
|
||||
def import_collection(
|
||||
token: str,
|
||||
service_url: str,
|
||||
collection: str,
|
||||
source: Path,
|
||||
ignore_errors: bool,
|
||||
):
|
||||
if token is None:
|
||||
click.echo(f'ERROR: no token provided', err=True)
|
||||
return 1
|
||||
|
||||
with (source / 'description.json').open() as description_file:
|
||||
description = json.load(description_file)
|
||||
|
||||
session = get_session()
|
||||
|
||||
# Import the curated records
|
||||
curated_source = source / 'curated'
|
||||
result = _load_records(
|
||||
service_url=service_url or description['service_url'],
|
||||
collection=collection or description['name'],
|
||||
token=token,
|
||||
source=curated_source,
|
||||
writer=curated_write_record,
|
||||
location_info='curated area',
|
||||
ignore_errors=ignore_errors,
|
||||
session=session,
|
||||
)
|
||||
if result != 0 and not ignore_errors:
|
||||
return result
|
||||
|
||||
# Import incoming areas
|
||||
errors = 0
|
||||
for incoming_source in (source / 'incoming').glob('*'):
|
||||
label = incoming_source.name
|
||||
console.print(f'processing incoming area {label}')
|
||||
|
||||
result = _load_records(
|
||||
service_url,
|
||||
collection,
|
||||
token,
|
||||
incoming_source,
|
||||
partial(incoming_write_record, label=label),
|
||||
location_info=f'incoming area {label}',
|
||||
session=session,
|
||||
ignore_errors=ignore_errors,
|
||||
)
|
||||
|
||||
if result != 0:
|
||||
if ignore_errors:
|
||||
errors += 1
|
||||
else:
|
||||
return result
|
||||
|
||||
return int(errors > 0)
|
||||
|
||||
|
||||
def _load_records(
|
||||
service_url: str,
|
||||
collection: str,
|
||||
token: str,
|
||||
source: Path,
|
||||
writer: Callable,
|
||||
location_info: str,
|
||||
session: Session,
|
||||
ignore_errors: bool = False,
|
||||
) -> int:
|
||||
errors = 0
|
||||
for path in source.glob('*'):
|
||||
class_name = path.parts[-1]
|
||||
result = _load_class_records(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
token=token,
|
||||
source=path,
|
||||
writer=writer,
|
||||
class_name=class_name,
|
||||
location_info=location_info,
|
||||
session=session,
|
||||
ignore_errors=ignore_errors,
|
||||
)
|
||||
if result != 0:
|
||||
if ignore_errors:
|
||||
errors += 1
|
||||
else:
|
||||
return result
|
||||
|
||||
return int(errors > 0)
|
||||
|
||||
|
||||
def _load_class_records(
|
||||
service_url: str,
|
||||
collection: str,
|
||||
token: str,
|
||||
source: Path,
|
||||
writer: Callable,
|
||||
class_name: str,
|
||||
location_info: str,
|
||||
session: Session,
|
||||
ignore_errors: bool = False,
|
||||
) -> int:
|
||||
|
||||
total = reduce(
|
||||
lambda s, path: s + (0 if path.is_dir() else 1),
|
||||
source.glob('**/*'),
|
||||
0,
|
||||
)
|
||||
for path in track(
|
||||
source.glob('**/*'),
|
||||
description = f'processing {location_info}, class {class_name} ... ',
|
||||
console=console,
|
||||
total=total,
|
||||
):
|
||||
if path.is_dir():
|
||||
continue
|
||||
|
||||
file_format = path.suffix[1:]
|
||||
try:
|
||||
with path.open('rt') as file:
|
||||
record = _file_reader[file_format.lower()](file)
|
||||
except Exception as e:
|
||||
console.print(f'[red]Error[/red]: could not read {file_format}-file: {path}`: {e}')
|
||||
if not ignore_errors:
|
||||
return 1
|
||||
continue
|
||||
|
||||
try:
|
||||
writer(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
class_name=class_name,
|
||||
record=record,
|
||||
token=token,
|
||||
session=session,
|
||||
)
|
||||
except HTTPError as e:
|
||||
console.print(f'[red]Error[/red]: could not write record with pid [green]{record["pid"]}[/green] of class `{class_name}` (file: [yellow]{path}[/yellow]) to: "{location_info}": {e} {e.response.text}')
|
||||
if not ignore_errors:
|
||||
return 1
|
||||
except Exception as e:
|
||||
console.print(f'[red]Error[/red]: could not write record with pid [green]{record["pid"]}[/green] of class `{class_name}` (file: [yellow]{path}[/yellow]) to: "{location_info}": {e}')
|
||||
if not ignore_errors:
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
def _json_reader(
|
||||
file
|
||||
):
|
||||
return json.load(file)
|
||||
|
||||
|
||||
def _yaml_reader(
|
||||
file
|
||||
):
|
||||
return yaml.load(file, Loader=yaml.SafeLoader)
|
||||
|
||||
|
||||
_file_reader = {
|
||||
'json': _json_reader,
|
||||
'yaml': _yaml_reader,
|
||||
}
|
||||
|
|
@ -41,6 +41,7 @@ __all__ = [
|
|||
'incoming_write_record',
|
||||
'maintenance',
|
||||
'server',
|
||||
'Session',
|
||||
]
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue