Add dtc subcommand import #30
3 changed files with 264 additions and 9 deletions
|
|
@ -1,11 +1,8 @@
|
||||||
|
import datetime
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
import sys
|
import sys
|
||||||
from collections import defaultdict
|
from itertools import chain
|
||||||
from itertools import (
|
|
||||||
chain,
|
|
||||||
count,
|
|
||||||
)
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
|
|
@ -106,7 +103,7 @@ def cli(
|
||||||
)
|
)
|
||||||
except HTTPError as e:
|
except HTTPError as e:
|
||||||
console.print(f'[red]Error[/red]: {e}: {e.response.text}')
|
console.print(f'[red]Error[/red]: {e}: {e.response.text}')
|
||||||
except ValueError as e:
|
except Exception as e:
|
||||||
console.print(f'[red]Error[/red]: {e}')
|
console.print(f'[red]Error[/red]: {e}')
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
|
|
@ -135,13 +132,15 @@ def export(
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
description = {
|
description = {
|
||||||
|
'date': datetime.datetime.now().isoformat(),
|
||||||
|
'service': service_url,
|
||||||
'name': collection,
|
'name': collection,
|
||||||
'schema': collection_info['schema'],
|
'schema': collection_info['schema'],
|
||||||
}
|
}
|
||||||
|
|
||||||
destination.mkdir(parents=True, exist_ok=False)
|
destination.mkdir(parents=True, exist_ok=False)
|
||||||
(destination / 'description.json').write_text(
|
(destination / 'description.json').write_text(
|
||||||
json.dumps(description, ensure_ascii=False),
|
json.dumps(description, indent=2, ensure_ascii=False) + '\n',
|
||||||
)
|
)
|
||||||
|
|
||||||
# Store the curated records
|
# Store the curated records
|
||||||
|
|
@ -200,14 +199,12 @@ def _store_records(
|
||||||
source_name: str,
|
source_name: str,
|
||||||
):
|
):
|
||||||
created_dirs = set()
|
created_dirs = set()
|
||||||
class_counters = defaultdict(count)
|
|
||||||
|
|
||||||
# Get the first result from the source to determine the total number
|
# Get the first result from the source to determine the total number
|
||||||
# of records.
|
# of records.
|
||||||
try:
|
try:
|
||||||
first_tuple = next(source)
|
first_tuple = next(source)
|
||||||
except StopIteration:
|
except StopIteration:
|
||||||
console.print(f'no records in incoming [green]{source_name}[/green], skipping it')
|
|
||||||
return
|
return
|
||||||
|
|
||||||
total = first_tuple[4]
|
total = first_tuple[4]
|
||||||
|
|
|
||||||
257
dump_things_pyclient/commands/dtc_plugins/import.py
Normal file
257
dump_things_pyclient/commands/dtc_plugins/import.py
Normal file
|
|
@ -0,0 +1,257 @@
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from functools import (
|
||||||
|
partial,
|
||||||
|
reduce,
|
||||||
|
)
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import (
|
||||||
|
Any,
|
||||||
|
Callable,
|
||||||
|
)
|
||||||
|
|
||||||
|
import rich_click as click
|
||||||
|
import yaml
|
||||||
|
from rich.console import Console
|
||||||
|
from rich.progress import track
|
||||||
|
|
||||||
|
from ...communicate import (
|
||||||
|
HTTPError,
|
||||||
|
Session,
|
||||||
|
curated_write_record,
|
||||||
|
get_session,
|
||||||
|
incoming_write_record,
|
||||||
|
)
|
||||||
|
|
||||||
|
subcommand_name = 'import'
|
||||||
|
|
||||||
|
console = Console(file=sys.stderr)
|
||||||
|
|
||||||
|
|
||||||
|
@click.command(short_help='Import a collection from the file system')
|
||||||
|
@click.pass_obj
|
||||||
|
@click.argument(
|
||||||
|
'source',
|
||||||
|
type=click.Path(
|
||||||
|
exists=True,
|
||||||
|
file_okay=False,
|
||||||
|
dir_okay=True,
|
||||||
|
writable=True,
|
||||||
|
allow_dash=False,
|
||||||
|
path_type=Path,
|
||||||
|
),
|
||||||
|
metavar='SOURCE_DIR',
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
'--service-url', '-s',
|
||||||
|
metavar='SERVICE_URL',
|
||||||
|
default=None,
|
||||||
|
help='use the service SERVICE_URL instead of the service URL that is '
|
||||||
|
'stored in `SOURCE_DIR/description.json`',
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
'--collection', '-c',
|
||||||
|
metavar='COLLECTION',
|
||||||
|
default=None,
|
||||||
|
help='use the collection name COLLECTION instead of the collection name '
|
||||||
|
'that is stored in `SOURCE_DIR/description.json`',
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
'--ignore-errors',
|
||||||
|
default=False,
|
||||||
|
is_flag=True,
|
||||||
|
help='log errors an continue import instead of raising an exception',
|
||||||
|
)
|
||||||
|
def cli(
|
||||||
|
obj: Any,
|
||||||
|
source: Path,
|
||||||
|
service_url: str,
|
||||||
|
collection: str,
|
||||||
|
ignore_errors,
|
||||||
|
):
|
||||||
|
"""Import a collection from disk
|
||||||
|
|
||||||
|
This command imports all records that are stored on disk in the directory
|
||||||
|
SOURCE_DIR in the format that is created by `dtc export`. The records are
|
||||||
|
stored in the dump-things service and the collection that are recorded in
|
||||||
|
`SOURCE_DIR/description.json`.
|
||||||
|
|
||||||
|
A token with curator rights has to be provided.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return import_collection(
|
||||||
|
obj,
|
||||||
|
service_url,
|
||||||
|
collection,
|
||||||
|
source,
|
||||||
|
ignore_errors,
|
||||||
|
)
|
||||||
|
except HTTPError as e:
|
||||||
|
click.echo(f'ERROR: {e}: {e.response.text}', err=True)
|
||||||
|
except ValueError as e:
|
||||||
|
click.echo(f'ERROR: {e}', err=True)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
def import_collection(
|
||||||
|
token: str,
|
||||||
|
service_url: str,
|
||||||
|
collection: str,
|
||||||
|
source: Path,
|
||||||
|
ignore_errors: bool,
|
||||||
|
):
|
||||||
|
if token is None:
|
||||||
|
click.echo(f'ERROR: no token provided', err=True)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
with (source / 'description.json').open() as description_file:
|
||||||
|
description = json.load(description_file)
|
||||||
|
|
||||||
|
session = get_session()
|
||||||
|
|
||||||
|
# Import the curated records
|
||||||
|
curated_source = source / 'curated'
|
||||||
|
result = _load_records(
|
||||||
|
service_url=service_url or description['service_url'],
|
||||||
|
collection=collection or description['name'],
|
||||||
|
token=token,
|
||||||
|
source=curated_source,
|
||||||
|
writer=curated_write_record,
|
||||||
|
location_info='curated area',
|
||||||
|
ignore_errors=ignore_errors,
|
||||||
|
session=session,
|
||||||
|
)
|
||||||
|
if result != 0 and not ignore_errors:
|
||||||
|
return result
|
||||||
|
|
||||||
|
# Import incoming areas
|
||||||
|
errors = 0
|
||||||
|
for incoming_source in (source / 'incoming').glob('*'):
|
||||||
|
label = incoming_source.name
|
||||||
|
console.print(f'processing incoming area {label}')
|
||||||
|
|
||||||
|
result = _load_records(
|
||||||
|
service_url,
|
||||||
|
collection,
|
||||||
|
token,
|
||||||
|
incoming_source,
|
||||||
|
partial(incoming_write_record, label=label),
|
||||||
|
location_info=f'incoming area {label}',
|
||||||
|
session=session,
|
||||||
|
ignore_errors=ignore_errors,
|
||||||
|
)
|
||||||
|
|
||||||
|
if result != 0:
|
||||||
|
if ignore_errors:
|
||||||
|
errors += 1
|
||||||
|
else:
|
||||||
|
return result
|
||||||
|
|
||||||
|
return int(errors > 0)
|
||||||
|
|
||||||
|
|
||||||
|
def _load_records(
|
||||||
|
service_url: str,
|
||||||
|
collection: str,
|
||||||
|
token: str,
|
||||||
|
source: Path,
|
||||||
|
writer: Callable,
|
||||||
|
location_info: str,
|
||||||
|
session: Session,
|
||||||
|
ignore_errors: bool = False,
|
||||||
|
) -> int:
|
||||||
|
errors = 0
|
||||||
|
for path in source.glob('*'):
|
||||||
|
class_name = path.parts[-1]
|
||||||
|
result = _load_class_records(
|
||||||
|
service_url=service_url,
|
||||||
|
collection=collection,
|
||||||
|
token=token,
|
||||||
|
source=path,
|
||||||
|
writer=writer,
|
||||||
|
class_name=class_name,
|
||||||
|
location_info=location_info,
|
||||||
|
session=session,
|
||||||
|
ignore_errors=ignore_errors,
|
||||||
|
)
|
||||||
|
if result != 0:
|
||||||
|
if ignore_errors:
|
||||||
|
errors += 1
|
||||||
|
else:
|
||||||
|
return result
|
||||||
|
|
||||||
|
return int(errors > 0)
|
||||||
|
|
||||||
|
|
||||||
|
def _load_class_records(
|
||||||
|
service_url: str,
|
||||||
|
collection: str,
|
||||||
|
token: str,
|
||||||
|
source: Path,
|
||||||
|
writer: Callable,
|
||||||
|
class_name: str,
|
||||||
|
location_info: str,
|
||||||
|
session: Session,
|
||||||
|
ignore_errors: bool = False,
|
||||||
|
) -> int:
|
||||||
|
|
||||||
|
total = reduce(
|
||||||
|
lambda s, path: s + (0 if path.is_dir() else 1),
|
||||||
|
source.glob('**/*'),
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
for path in track(
|
||||||
|
source.glob('**/*'),
|
||||||
|
description = f'processing {location_info}, class {class_name} ... ',
|
||||||
|
console=console,
|
||||||
|
total=total,
|
||||||
|
):
|
||||||
|
if path.is_dir():
|
||||||
|
continue
|
||||||
|
|
||||||
|
file_format = path.suffix[1:]
|
||||||
|
try:
|
||||||
|
with path.open('rt') as file:
|
||||||
|
record = _file_reader[file_format.lower()](file)
|
||||||
|
except Exception as e:
|
||||||
|
console.print(f'[red]Error[/red]: could not read {file_format}-file: {path}`: {e}')
|
||||||
|
if not ignore_errors:
|
||||||
|
return 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
writer(
|
||||||
|
service_url=service_url,
|
||||||
|
collection=collection,
|
||||||
|
class_name=class_name,
|
||||||
|
record=record,
|
||||||
|
token=token,
|
||||||
|
session=session,
|
||||||
|
)
|
||||||
|
except HTTPError as e:
|
||||||
|
console.print(f'[red]Error[/red]: could not write record with pid [green]{record["pid"]}[/green] of class `{class_name}` (file: [yellow]{path}[/yellow]) to: "{location_info}": {e} {e.response.text}')
|
||||||
|
if not ignore_errors:
|
||||||
|
return 1
|
||||||
|
except Exception as e:
|
||||||
|
console.print(f'[red]Error[/red]: could not write record with pid [green]{record["pid"]}[/green] of class `{class_name}` (file: [yellow]{path}[/yellow]) to: "{location_info}": {e}')
|
||||||
|
if not ignore_errors:
|
||||||
|
return 1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def _json_reader(
|
||||||
|
file
|
||||||
|
):
|
||||||
|
return json.load(file)
|
||||||
|
|
||||||
|
|
||||||
|
def _yaml_reader(
|
||||||
|
file
|
||||||
|
):
|
||||||
|
return yaml.load(file, Loader=yaml.SafeLoader)
|
||||||
|
|
||||||
|
|
||||||
|
_file_reader = {
|
||||||
|
'json': _json_reader,
|
||||||
|
'yaml': _yaml_reader,
|
||||||
|
}
|
||||||
|
|
@ -41,6 +41,7 @@ __all__ = [
|
||||||
'incoming_write_record',
|
'incoming_write_record',
|
||||||
'maintenance',
|
'maintenance',
|
||||||
'server',
|
'server',
|
||||||
|
'Session',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue