Add progress output, remove token logging #22

Merged
cmo merged 2 commits from issue-21 into master 2026-02-03 15:58:33 +00:00

View file

@ -2,8 +2,19 @@ import json
import logging
import re
import sys
from itertools import (
count,
chain,
)
from typing import (
cast,
Iterable,
)
import rich_click as click
from rich import print as rprint
from rich.console import Console
from rich.progress import track
from ...communicate import (
HTTPError,
@ -114,7 +125,11 @@ def cli(
dry_run,
)
except HTTPError as e:
print(f'ERROR: {e}: {e.response.text}', file=sys.stderr, flush=True)
rprint(
f'[red]Error[/red]: {e}: {e.response.text}',
file=sys.stderr,
flush=True,
)
return 1
@ -135,15 +150,13 @@ def auto_curate(
curator_token = obj
if curator_token is None:
print(
f'ERROR: no token was provided (use --token or DTC_TOKEN environment variable)',
rprint(
f'[red]Error[/red]: no token was provided (use --token or DTC_TOKEN environment variable)',
file=sys.stderr,
flush=True,
)
return 1
click.echo(f'auto curate: {obj}')
output = None
# If --list-labels and --list-records are provided, keep only the latter,
@ -157,12 +170,12 @@ def auto_curate(
if list_labels:
output = []
for label in incoming_read_labels(
service_url=service_url,
collection=collection,
token=obj,
):
all_labels = incoming_read_labels(
service_url=service_url,
collection=collection,
token=obj,
)
for label in all_labels:
if include and label not in include:
logger.debug('ignoring non-included incoming label: %s', label)
continue
@ -178,13 +191,38 @@ def auto_curate(
if list_records:
output[label] = []
for record, _, _, _, _ in incoming_read_records(
service_url=service_url,
collection=collection,
label=label,
token=obj,
):
# Get the total number of entries for the
record_source = incoming_read_records(
service_url=service_url,
collection=collection,
label=label,
token=obj,
)
# Get the first entry to find the total number of records
try:
first_record, _, _, _, total = next(record_source)
except StopIteration:
rprint(
f'no records in incoming area [green]{label}[/green], skipping it',
file=sys.stderr,
flush=True,
)
continue
# Get the first entry an all other entries
for index, (record, _, _, _, total) in track(
zip(
count(),
chain(
[(first_record, None, None, None, None)],
cast(Iterable, record_source),
),
),
description=f'processing [green]{label}[/green]',
total=total,
console=Console(file=sys.stderr),
):
if list_records:
output[label].append(record)
continue
@ -204,43 +242,56 @@ def auto_curate(
class_name = re.search('([_A-Za-z0-9]*$)', record['schema_type']).group(0)
except (IndexError, KeyError):
global stl_info
rprint(f'[yellow]Warning[/yellow]: ignoring record with pid {record["pid"]} because `schema_type` attribute is missing.')
if not stl_info:
logger.warning(
f"""Could not find `schema_type` attribute in record with
pid {record['pid']}. Please ensure that `schema_type` is stored in
the records or that the associated incoming area store has a backend
with a "Schema Type Layer", i.e., "record_dir+stl" or
"sqlite+stl".""",
print(
' Please ensure that `schema_type` is stored in the records '
'or that the associated incoming area store has a backend with a '
'"Schema Type Layer", i.e., "record_dir+stl" or "sqlite+stl"."',
)
stl_info = True
else:
logger.warning(f'ignoring record with pid {record["pid"]}, `schema_type` attribute is missing.')
continue
if dry_run:
print(f'WRITE record "{record["pid"]}" of class "{class_name}" to "{destination_collection}@{destination_service_url}"')
print(f'DELETE record "{record["pid"]}" from inbox "{label}" of "{collection}@{service_url}"')
click.echo(f'WRITE record "{record["pid"]}" of class "{class_name}" to "{destination_collection}@{destination_service_url}"')
click.echo(f'DELETE record "{record["pid"]}" from inbox "{label}" of "{collection}@{service_url}"')
continue
# Store record in destination collection
curated_write_record(
service_url=destination_service_url,
collection=destination_collection,
class_name=class_name,
record=record,
token=destination_token)
try:
curated_write_record(
service_url=destination_service_url,
collection=destination_collection,
class_name=class_name,
record=record,
token=destination_token)
except HTTPError as e:
rprint(
f'[red]Error[/red]: writing record with pid {record["pid"]} failed: {e}: {e.response.text}',
file=sys.stderr,
flush=True,
)
raise
# Delete record from incoming area
incoming_delete_record(
service_url=service_url,
collection=collection,
label=label,
pid=record['pid'],
token=obj,
)
try:
incoming_delete_record(
service_url=service_url,
collection=collection,
label=label,
pid=record['pid'],
token=obj,
)
except HTTPError as e:
rprint(
f'[red]ERROR[/red]: deleting record with pid {record["pid"]} failed: {e}: {e.response.text}',
file=sys.stderr,
flush=True,
)
raise
if output is not None:
print(json.dumps(output, ensure_ascii=False))
rprint(json.dumps(output, ensure_ascii=False))
return 0