Tackle issues reported in issue #14 #28

Merged
cmo merged 5 commits from issue-14 into master 2026-02-05 12:50:16 +00:00
3 changed files with 127 additions and 32 deletions

View file

@ -1,14 +1,19 @@
import hashlib
import json
import sys
from collections import defaultdict
from itertools import count
from itertools import (
chain,
count,
)
from pathlib import Path
from typing import (
Any,
Iterable,
Generator,
)
import rich_click as click
import yaml
from rich.console import Console
from rich.progress import track
@ -49,18 +54,34 @@ console = Console(file=sys.stderr)
),
metavar='DESTINATION_DIR',
)
@click.option(
'--format', '-f', 'output_format',
type=click.Choice(('json', 'yaml'), case_sensitive=True),
default='json',
help='select output format for the exported records (default: json)',
)
@click.option(
'--ignore-errors',
default=False,
is_flag=True,
help='ignore records with missing `schema_type` instead of raising an error',
)
@click.option(
'--keep-schema-type', '-k',
default=False,
is_flag=True,
help='keep `schema_type`-attribute in records on file-system. By default the '
'schema_type-attribute is removed because the class is encoded in the '
'storage path of the records.'
)
def cli(
obj: Any,
service_url: str,
collection: str,
destination: Path,
ignore_errors,
output_format: str,
ignore_errors: bool,
keep_schema_type: bool,
):
"""Export a collection to disk
@ -79,7 +100,9 @@ def cli(
service_url,
collection,
destination,
output_format,
ignore_errors,
keep_schema_type,
)
except HTTPError as e:
console.print(f'[red]Error[/red]: {e}: {e.response.text}')
@ -93,7 +116,9 @@ def export(
service_url: str,
collection: str,
destination: Path,
output_format: str,
ignore_errors: bool,
keep_schema_type: bool,
):
token = obj
@ -125,17 +150,17 @@ def export(
console.print('Exporting records from curated area')
_store_records(
map(
lambda x: x[0],
curated_read_records(
service_url=service_url,
collection=collection,
token=token,
session=session,
)
),
curated_destination,
output_format,
ignore_errors,
keep_schema_type,
source_name='curated area',
)
# Store the incoming records
@ -149,55 +174,122 @@ def export(
incoming_destination = destination / 'incoming' / label
incoming_destination.mkdir(parents=True, exist_ok=False)
_store_records(
map(
lambda x: x[0],
incoming_read_records(
service_url=service_url,
collection=collection,
label=label,
token=token,
session=session,
)
),
incoming_destination,
output_format,
ignore_errors,
keep_schema_type,
source_name=f'incoming area: {label}'
)
return 0
def _store_records(
source: Iterable,
source: Generator,
destination: Path,
ignore_errors: bool = False,
output_format: str,
ignore_errors: bool,
keep_schema_type: bool,
source_name: str,
):
created_dirs = set()
class_counters = defaultdict(count)
for record in track(source, console=console):
class_name = _de_prefix(record.get('schema_type', None))
if class_name is None:
# Get the first result from the source to determine the total number
# of records.
try:
first_tuple = next(source)
except StopIteration:
console.print(f'no records in incoming [green]{source_name}[/green], skipping it')
return
total = first_tuple[4]
for record, _, _, _, _ in track(chain([first_tuple], source), total=total, console=console):
schema_type = record.get('schema_type', None)
if schema_type is None:
if ignore_errors:
console.print(f'[red]Error[/red]: no `schema type` in record {record["pid"]}')
console.print(f'[red]Error[/red]: no `schema type` in record [red]{record["pid"]}[/red] in {source_name}')
continue
msg = f'no `schema_type` in record {record["pid"]}'
raise ValueError(msg)
next_name_for_class = f'{next(class_counters[class_name]):09d}.json'
class_name = _de_prefix(schema_type)
if not keep_schema_type:
del record['schema_type']
hash_dir, hash_name = _hash_p3(record['pid'])
file_dir, file_name = (
destination / class_name / next_name_for_class[:3],
next_name_for_class[3:]
destination / class_name / hash_dir,
hash_name,
)
if file_dir not in created_dirs:
file_dir.mkdir(parents=True, exist_ok=False)
created_dirs.add(file_dir)
(file_dir / file_name).write_text(
json.dumps(record, indent=2, ensure_ascii=False),
try:
writer[output_format](
file_dir=file_dir,
file_name=file_name,
record=record,
)
except KeyError as e:
msg = f'unsupported output format: {output_format}'
raise ValueError(msg)
def _de_prefix(
name: str,
):
return name.split(':', 1)[-1]
def _get_hex_digest(
data: str,
) -> str:
hash_context = hashlib.md5(data.encode())
return hash_context.hexdigest()
def _hash_p3(
pid: str,
) -> tuple[str, str]:
hex_digest = _get_hex_digest(pid)
return hex_digest[:3], hex_digest[3:]
def write_json(
file_dir: Path,
file_name: str,
record: dict,
):
(file_dir / (file_name + '.json')).write_text(
json.dumps(record, indent=2, ensure_ascii=False) + '\n',
)
def write_yaml(
file_dir: Path,
file_name: str,
record: dict,
):
(file_dir / (file_name + '.yaml')).write_text(
yaml.dump(
data=record,
sort_keys=False,
allow_unicode=True,
default_flow_style=False,
),
)
writer = {
'json': write_json,
'yaml': write_yaml,
}

View file

@ -9,6 +9,7 @@ authors = [
]
dependencies = [
"click>=8.3.1",
"pyyaml>=6.0.3",
"requests>=2.32.5",
"rich-click>=1.9.6",
]

2
uv.lock generated
View file

@ -373,6 +373,7 @@ version = "0.2.6"
source = { virtual = "." }
dependencies = [
{ name = "click" },
{ name = "pyyaml" },
{ name = "requests" },
{ name = "rich-click" },
]
@ -390,6 +391,7 @@ requires-dist = [
{ name = "click", specifier = ">=8.3.1" },
{ name = "dump-things-service", marker = "extra == 'ttl'", specifier = ">=5.3.0" },
{ name = "pytest", marker = "extra == 'tests'", specifier = ">=9.0.1" },
{ name = "pyyaml", specifier = ">=6.0.3" },
{ name = "requests", specifier = ">=2.32.5" },
{ name = "rich-click", specifier = ">=1.9.6" },
]