Tackle issues reported in issue #14 #28

Merged
cmo merged 5 commits from issue-14 into master 2026-02-05 12:50:16 +00:00
3 changed files with 127 additions and 32 deletions

View file

@ -1,14 +1,19 @@
import hashlib
import json import json
import sys import sys
from collections import defaultdict from collections import defaultdict
from itertools import count from itertools import (
chain,
count,
)
from pathlib import Path from pathlib import Path
from typing import ( from typing import (
Any, Any,
Iterable, Generator,
) )
import rich_click as click import rich_click as click
import yaml
from rich.console import Console from rich.console import Console
from rich.progress import track from rich.progress import track
@ -49,18 +54,34 @@ console = Console(file=sys.stderr)
), ),
metavar='DESTINATION_DIR', metavar='DESTINATION_DIR',
) )
@click.option(
'--format', '-f', 'output_format',
type=click.Choice(('json', 'yaml'), case_sensitive=True),
default='json',
help='select output format for the exported records (default: json)',
)
@click.option( @click.option(
'--ignore-errors', '--ignore-errors',
default=False, default=False,
is_flag=True, is_flag=True,
help='ignore records with missing `schema_type` instead of raising an error', help='ignore records with missing `schema_type` instead of raising an error',
) )
@click.option(
'--keep-schema-type', '-k',
default=False,
is_flag=True,
help='keep `schema_type`-attribute in records on file-system. By default the '
'schema_type-attribute is removed because the class is encoded in the '
'storage path of the records.'
)
def cli( def cli(
obj: Any, obj: Any,
service_url: str, service_url: str,
collection: str, collection: str,
destination: Path, destination: Path,
ignore_errors, output_format: str,
ignore_errors: bool,
keep_schema_type: bool,
): ):
"""Export a collection to disk """Export a collection to disk
@ -79,7 +100,9 @@ def cli(
service_url, service_url,
collection, collection,
destination, destination,
output_format,
ignore_errors, ignore_errors,
keep_schema_type,
) )
except HTTPError as e: except HTTPError as e:
console.print(f'[red]Error[/red]: {e}: {e.response.text}') console.print(f'[red]Error[/red]: {e}: {e.response.text}')
@ -93,7 +116,9 @@ def export(
service_url: str, service_url: str,
collection: str, collection: str,
destination: Path, destination: Path,
output_format: str,
ignore_errors: bool, ignore_errors: bool,
keep_schema_type: bool,
): ):
token = obj token = obj
@ -125,17 +150,17 @@ def export(
console.print('Exporting records from curated area') console.print('Exporting records from curated area')
_store_records( _store_records(
map( curated_read_records(
lambda x: x[0], service_url=service_url,
curated_read_records( collection=collection,
service_url=service_url, token=token,
collection=collection, session=session,
token=token,
session=session,
)
), ),
curated_destination, curated_destination,
output_format,
ignore_errors, ignore_errors,
keep_schema_type,
source_name='curated area',
) )
# Store the incoming records # Store the incoming records
@ -149,55 +174,122 @@ def export(
incoming_destination = destination / 'incoming' / label incoming_destination = destination / 'incoming' / label
incoming_destination.mkdir(parents=True, exist_ok=False) incoming_destination.mkdir(parents=True, exist_ok=False)
_store_records( _store_records(
map( incoming_read_records(
lambda x: x[0], service_url=service_url,
incoming_read_records( collection=collection,
service_url=service_url, label=label,
collection=collection, token=token,
label=label, session=session,
token=token,
session=session,
)
), ),
incoming_destination, incoming_destination,
output_format,
ignore_errors, ignore_errors,
keep_schema_type,
source_name=f'incoming area: {label}'
) )
return 0 return 0
def _store_records( def _store_records(
source: Iterable, source: Generator,
destination: Path, destination: Path,
ignore_errors: bool = False, output_format: str,
ignore_errors: bool,
keep_schema_type: bool,
source_name: str,
): ):
created_dirs = set() created_dirs = set()
class_counters = defaultdict(count) class_counters = defaultdict(count)
for record in track(source, console=console): # Get the first result from the source to determine the total number
class_name = _de_prefix(record.get('schema_type', None)) # of records.
if class_name is None: try:
first_tuple = next(source)
except StopIteration:
console.print(f'no records in incoming [green]{source_name}[/green], skipping it')
return
total = first_tuple[4]
for record, _, _, _, _ in track(chain([first_tuple], source), total=total, console=console):
schema_type = record.get('schema_type', None)
if schema_type is None:
if ignore_errors: if ignore_errors:
console.print(f'[red]Error[/red]: no `schema type` in record {record["pid"]}') console.print(f'[red]Error[/red]: no `schema type` in record [red]{record["pid"]}[/red] in {source_name}')
continue continue
msg = f'no `schema_type` in record {record["pid"]}' msg = f'no `schema_type` in record {record["pid"]}'
raise ValueError(msg) raise ValueError(msg)
next_name_for_class = f'{next(class_counters[class_name]):09d}.json' class_name = _de_prefix(schema_type)
if not keep_schema_type:
del record['schema_type']
hash_dir, hash_name = _hash_p3(record['pid'])
file_dir, file_name = ( file_dir, file_name = (
destination / class_name / next_name_for_class[:3], destination / class_name / hash_dir,
next_name_for_class[3:] hash_name,
) )
if file_dir not in created_dirs: if file_dir not in created_dirs:
file_dir.mkdir(parents=True, exist_ok=False) file_dir.mkdir(parents=True, exist_ok=False)
created_dirs.add(file_dir) created_dirs.add(file_dir)
(file_dir / file_name).write_text( try:
json.dumps(record, indent=2, ensure_ascii=False), writer[output_format](
) file_dir=file_dir,
file_name=file_name,
record=record,
)
except KeyError as e:
msg = f'unsupported output format: {output_format}'
raise ValueError(msg)
def _de_prefix( def _de_prefix(
name: str, name: str,
): ):
return name.split(':', 1)[-1] return name.split(':', 1)[-1]
def _get_hex_digest(
data: str,
) -> str:
hash_context = hashlib.md5(data.encode())
return hash_context.hexdigest()
def _hash_p3(
pid: str,
) -> tuple[str, str]:
hex_digest = _get_hex_digest(pid)
return hex_digest[:3], hex_digest[3:]
def write_json(
file_dir: Path,
file_name: str,
record: dict,
):
(file_dir / (file_name + '.json')).write_text(
json.dumps(record, indent=2, ensure_ascii=False) + '\n',
)
def write_yaml(
file_dir: Path,
file_name: str,
record: dict,
):
(file_dir / (file_name + '.yaml')).write_text(
yaml.dump(
data=record,
sort_keys=False,
allow_unicode=True,
default_flow_style=False,
),
)
writer = {
'json': write_json,
'yaml': write_yaml,
}

View file

@ -9,6 +9,7 @@ authors = [
] ]
dependencies = [ dependencies = [
"click>=8.3.1", "click>=8.3.1",
"pyyaml>=6.0.3",
"requests>=2.32.5", "requests>=2.32.5",
"rich-click>=1.9.6", "rich-click>=1.9.6",
] ]

2
uv.lock generated
View file

@ -373,6 +373,7 @@ version = "0.2.6"
source = { virtual = "." } source = { virtual = "." }
dependencies = [ dependencies = [
{ name = "click" }, { name = "click" },
{ name = "pyyaml" },
{ name = "requests" }, { name = "requests" },
{ name = "rich-click" }, { name = "rich-click" },
] ]
@ -390,6 +391,7 @@ requires-dist = [
{ name = "click", specifier = ">=8.3.1" }, { name = "click", specifier = ">=8.3.1" },
{ name = "dump-things-service", marker = "extra == 'ttl'", specifier = ">=5.3.0" }, { name = "dump-things-service", marker = "extra == 'ttl'", specifier = ">=5.3.0" },
{ name = "pytest", marker = "extra == 'tests'", specifier = ">=9.0.1" }, { name = "pytest", marker = "extra == 'tests'", specifier = ">=9.0.1" },
{ name = "pyyaml", specifier = ">=6.0.3" },
{ name = "requests", specifier = ">=2.32.5" }, { name = "requests", specifier = ">=2.32.5" },
{ name = "rich-click", specifier = ">=1.9.6" }, { name = "rich-click", specifier = ">=1.9.6" },
] ]