Tackle issues reported in issue #14 #28
3 changed files with 127 additions and 32 deletions
|
|
@ -1,14 +1,19 @@
|
|||
import hashlib
|
||||
import json
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from itertools import count
|
||||
from itertools import (
|
||||
chain,
|
||||
count,
|
||||
)
|
||||
from pathlib import Path
|
||||
from typing import (
|
||||
Any,
|
||||
Iterable,
|
||||
Generator,
|
||||
)
|
||||
|
||||
import rich_click as click
|
||||
import yaml
|
||||
from rich.console import Console
|
||||
from rich.progress import track
|
||||
|
||||
|
|
@ -49,18 +54,34 @@ console = Console(file=sys.stderr)
|
|||
),
|
||||
metavar='DESTINATION_DIR',
|
||||
)
|
||||
@click.option(
|
||||
'--format', '-f', 'output_format',
|
||||
type=click.Choice(('json', 'yaml'), case_sensitive=True),
|
||||
default='json',
|
||||
help='select output format for the exported records (default: json)',
|
||||
)
|
||||
@click.option(
|
||||
'--ignore-errors',
|
||||
default=False,
|
||||
is_flag=True,
|
||||
help='ignore records with missing `schema_type` instead of raising an error',
|
||||
)
|
||||
@click.option(
|
||||
'--keep-schema-type', '-k',
|
||||
default=False,
|
||||
is_flag=True,
|
||||
help='keep `schema_type`-attribute in records on file-system. By default the '
|
||||
'schema_type-attribute is removed because the class is encoded in the '
|
||||
'storage path of the records.'
|
||||
)
|
||||
def cli(
|
||||
obj: Any,
|
||||
service_url: str,
|
||||
collection: str,
|
||||
destination: Path,
|
||||
ignore_errors,
|
||||
output_format: str,
|
||||
ignore_errors: bool,
|
||||
keep_schema_type: bool,
|
||||
):
|
||||
"""Export a collection to disk
|
||||
|
||||
|
|
@ -79,7 +100,9 @@ def cli(
|
|||
service_url,
|
||||
collection,
|
||||
destination,
|
||||
output_format,
|
||||
ignore_errors,
|
||||
keep_schema_type,
|
||||
)
|
||||
except HTTPError as e:
|
||||
console.print(f'[red]Error[/red]: {e}: {e.response.text}')
|
||||
|
|
@ -93,7 +116,9 @@ def export(
|
|||
service_url: str,
|
||||
collection: str,
|
||||
destination: Path,
|
||||
output_format: str,
|
||||
ignore_errors: bool,
|
||||
keep_schema_type: bool,
|
||||
):
|
||||
token = obj
|
||||
|
||||
|
|
@ -125,17 +150,17 @@ def export(
|
|||
|
||||
console.print('Exporting records from curated area')
|
||||
_store_records(
|
||||
map(
|
||||
lambda x: x[0],
|
||||
curated_read_records(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
token=token,
|
||||
session=session,
|
||||
)
|
||||
curated_read_records(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
token=token,
|
||||
session=session,
|
||||
),
|
||||
curated_destination,
|
||||
output_format,
|
||||
ignore_errors,
|
||||
keep_schema_type,
|
||||
source_name='curated area',
|
||||
)
|
||||
|
||||
# Store the incoming records
|
||||
|
|
@ -149,55 +174,122 @@ def export(
|
|||
incoming_destination = destination / 'incoming' / label
|
||||
incoming_destination.mkdir(parents=True, exist_ok=False)
|
||||
_store_records(
|
||||
map(
|
||||
lambda x: x[0],
|
||||
incoming_read_records(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
label=label,
|
||||
token=token,
|
||||
session=session,
|
||||
)
|
||||
incoming_read_records(
|
||||
service_url=service_url,
|
||||
collection=collection,
|
||||
label=label,
|
||||
token=token,
|
||||
session=session,
|
||||
),
|
||||
incoming_destination,
|
||||
output_format,
|
||||
ignore_errors,
|
||||
keep_schema_type,
|
||||
source_name=f'incoming area: {label}'
|
||||
)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def _store_records(
|
||||
source: Iterable,
|
||||
source: Generator,
|
||||
destination: Path,
|
||||
ignore_errors: bool = False,
|
||||
output_format: str,
|
||||
ignore_errors: bool,
|
||||
keep_schema_type: bool,
|
||||
source_name: str,
|
||||
):
|
||||
created_dirs = set()
|
||||
class_counters = defaultdict(count)
|
||||
|
||||
for record in track(source, console=console):
|
||||
class_name = _de_prefix(record.get('schema_type', None))
|
||||
if class_name is None:
|
||||
# Get the first result from the source to determine the total number
|
||||
# of records.
|
||||
try:
|
||||
first_tuple = next(source)
|
||||
except StopIteration:
|
||||
console.print(f'no records in incoming [green]{source_name}[/green], skipping it')
|
||||
return
|
||||
|
||||
total = first_tuple[4]
|
||||
for record, _, _, _, _ in track(chain([first_tuple], source), total=total, console=console):
|
||||
schema_type = record.get('schema_type', None)
|
||||
if schema_type is None:
|
||||
if ignore_errors:
|
||||
console.print(f'[red]Error[/red]: no `schema type` in record {record["pid"]}')
|
||||
console.print(f'[red]Error[/red]: no `schema type` in record [red]{record["pid"]}[/red] in {source_name}')
|
||||
continue
|
||||
msg = f'no `schema_type` in record {record["pid"]}'
|
||||
raise ValueError(msg)
|
||||
|
||||
next_name_for_class = f'{next(class_counters[class_name]):09d}.json'
|
||||
class_name = _de_prefix(schema_type)
|
||||
if not keep_schema_type:
|
||||
del record['schema_type']
|
||||
|
||||
hash_dir, hash_name = _hash_p3(record['pid'])
|
||||
file_dir, file_name = (
|
||||
destination / class_name / next_name_for_class[:3],
|
||||
next_name_for_class[3:]
|
||||
destination / class_name / hash_dir,
|
||||
hash_name,
|
||||
)
|
||||
if file_dir not in created_dirs:
|
||||
file_dir.mkdir(parents=True, exist_ok=False)
|
||||
created_dirs.add(file_dir)
|
||||
|
||||
(file_dir / file_name).write_text(
|
||||
json.dumps(record, indent=2, ensure_ascii=False),
|
||||
)
|
||||
try:
|
||||
writer[output_format](
|
||||
file_dir=file_dir,
|
||||
file_name=file_name,
|
||||
record=record,
|
||||
)
|
||||
except KeyError as e:
|
||||
msg = f'unsupported output format: {output_format}'
|
||||
raise ValueError(msg)
|
||||
|
||||
|
||||
def _de_prefix(
|
||||
name: str,
|
||||
):
|
||||
return name.split(':', 1)[-1]
|
||||
|
||||
|
||||
def _get_hex_digest(
|
||||
data: str,
|
||||
) -> str:
|
||||
hash_context = hashlib.md5(data.encode())
|
||||
return hash_context.hexdigest()
|
||||
|
||||
|
||||
def _hash_p3(
|
||||
pid: str,
|
||||
) -> tuple[str, str]:
|
||||
hex_digest = _get_hex_digest(pid)
|
||||
return hex_digest[:3], hex_digest[3:]
|
||||
|
||||
|
||||
def write_json(
|
||||
file_dir: Path,
|
||||
file_name: str,
|
||||
record: dict,
|
||||
):
|
||||
(file_dir / (file_name + '.json')).write_text(
|
||||
json.dumps(record, indent=2, ensure_ascii=False) + '\n',
|
||||
)
|
||||
|
||||
|
||||
def write_yaml(
|
||||
file_dir: Path,
|
||||
file_name: str,
|
||||
record: dict,
|
||||
):
|
||||
(file_dir / (file_name + '.yaml')).write_text(
|
||||
yaml.dump(
|
||||
data=record,
|
||||
sort_keys=False,
|
||||
allow_unicode=True,
|
||||
default_flow_style=False,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
writer = {
|
||||
'json': write_json,
|
||||
'yaml': write_yaml,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ authors = [
|
|||
]
|
||||
dependencies = [
|
||||
"click>=8.3.1",
|
||||
"pyyaml>=6.0.3",
|
||||
"requests>=2.32.5",
|
||||
"rich-click>=1.9.6",
|
||||
]
|
||||
|
|
|
|||
2
uv.lock
generated
2
uv.lock
generated
|
|
@ -373,6 +373,7 @@ version = "0.2.6"
|
|||
source = { virtual = "." }
|
||||
dependencies = [
|
||||
{ name = "click" },
|
||||
{ name = "pyyaml" },
|
||||
{ name = "requests" },
|
||||
{ name = "rich-click" },
|
||||
]
|
||||
|
|
@ -390,6 +391,7 @@ requires-dist = [
|
|||
{ name = "click", specifier = ">=8.3.1" },
|
||||
{ name = "dump-things-service", marker = "extra == 'ttl'", specifier = ">=5.3.0" },
|
||||
{ name = "pytest", marker = "extra == 'tests'", specifier = ">=9.0.1" },
|
||||
{ name = "pyyaml", specifier = ">=6.0.3" },
|
||||
{ name = "requests", specifier = ">=2.32.5" },
|
||||
{ name = "rich-click", specifier = ">=1.9.6" },
|
||||
]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue