Add progress output, remove token logging #22

Merged
cmo merged 2 commits from issue-21 into master 2026-02-03 15:58:33 +00:00

View file

@ -2,8 +2,19 @@ import json
import logging import logging
import re import re
import sys import sys
from itertools import (
count,
chain,
)
from typing import (
cast,
Iterable,
)
import rich_click as click import rich_click as click
from rich import print as rprint
from rich.console import Console
from rich.progress import track
from ...communicate import ( from ...communicate import (
HTTPError, HTTPError,
@ -114,7 +125,11 @@ def cli(
dry_run, dry_run,
) )
except HTTPError as e: except HTTPError as e:
print(f'ERROR: {e}: {e.response.text}', file=sys.stderr, flush=True) rprint(
f'[red]Error[/red]: {e}: {e.response.text}',
file=sys.stderr,
flush=True,
)
return 1 return 1
@ -135,15 +150,13 @@ def auto_curate(
curator_token = obj curator_token = obj
if curator_token is None: if curator_token is None:
print( rprint(
f'ERROR: no token was provided (use --token or DTC_TOKEN environment variable)', f'[red]Error[/red]: no token was provided (use --token or DTC_TOKEN environment variable)',
file=sys.stderr, file=sys.stderr,
flush=True, flush=True,
) )
return 1 return 1
click.echo(f'auto curate: {obj}')
output = None output = None
# If --list-labels and --list-records are provided, keep only the latter, # If --list-labels and --list-records are provided, keep only the latter,
@ -157,12 +170,12 @@ def auto_curate(
if list_labels: if list_labels:
output = [] output = []
for label in incoming_read_labels( all_labels = incoming_read_labels(
service_url=service_url, service_url=service_url,
collection=collection, collection=collection,
token=obj, token=obj,
): )
for label in all_labels:
if include and label not in include: if include and label not in include:
logger.debug('ignoring non-included incoming label: %s', label) logger.debug('ignoring non-included incoming label: %s', label)
continue continue
@ -178,13 +191,38 @@ def auto_curate(
if list_records: if list_records:
output[label] = [] output[label] = []
for record, _, _, _, _ in incoming_read_records( # Get the total number of entries for the
service_url=service_url, record_source = incoming_read_records(
collection=collection, service_url=service_url,
label=label, collection=collection,
token=obj, label=label,
): token=obj,
)
# Get the first entry to find the total number of records
try:
first_record, _, _, _, total = next(record_source)
except StopIteration:
rprint(
f'no records in incoming area [green]{label}[/green], skipping it',
file=sys.stderr,
flush=True,
)
continue
# Get the first entry an all other entries
for index, (record, _, _, _, total) in track(
zip(
count(),
chain(
[(first_record, None, None, None, None)],
cast(Iterable, record_source),
),
),
description=f'processing [green]{label}[/green]',
total=total,
console=Console(file=sys.stderr),
):
if list_records: if list_records:
output[label].append(record) output[label].append(record)
continue continue
@ -204,43 +242,56 @@ def auto_curate(
class_name = re.search('([_A-Za-z0-9]*$)', record['schema_type']).group(0) class_name = re.search('([_A-Za-z0-9]*$)', record['schema_type']).group(0)
except (IndexError, KeyError): except (IndexError, KeyError):
global stl_info global stl_info
rprint(f'[yellow]Warning[/yellow]: ignoring record with pid {record["pid"]} because `schema_type` attribute is missing.')
if not stl_info: if not stl_info:
logger.warning( print(
f"""Could not find `schema_type` attribute in record with ' Please ensure that `schema_type` is stored in the records '
pid {record['pid']}. Please ensure that `schema_type` is stored in 'or that the associated incoming area store has a backend with a '
the records or that the associated incoming area store has a backend '"Schema Type Layer", i.e., "record_dir+stl" or "sqlite+stl"."',
with a "Schema Type Layer", i.e., "record_dir+stl" or
"sqlite+stl".""",
) )
stl_info = True stl_info = True
else:
logger.warning(f'ignoring record with pid {record["pid"]}, `schema_type` attribute is missing.')
continue continue
if dry_run: if dry_run:
print(f'WRITE record "{record["pid"]}" of class "{class_name}" to "{destination_collection}@{destination_service_url}"') click.echo(f'WRITE record "{record["pid"]}" of class "{class_name}" to "{destination_collection}@{destination_service_url}"')
print(f'DELETE record "{record["pid"]}" from inbox "{label}" of "{collection}@{service_url}"') click.echo(f'DELETE record "{record["pid"]}" from inbox "{label}" of "{collection}@{service_url}"')
continue continue
# Store record in destination collection # Store record in destination collection
curated_write_record( try:
service_url=destination_service_url, curated_write_record(
collection=destination_collection, service_url=destination_service_url,
class_name=class_name, collection=destination_collection,
record=record, class_name=class_name,
token=destination_token) record=record,
token=destination_token)
except HTTPError as e:
rprint(
f'[red]Error[/red]: writing record with pid {record["pid"]} failed: {e}: {e.response.text}',
file=sys.stderr,
flush=True,
)
raise
# Delete record from incoming area # Delete record from incoming area
incoming_delete_record( try:
service_url=service_url, incoming_delete_record(
collection=collection, service_url=service_url,
label=label, collection=collection,
pid=record['pid'], label=label,
token=obj, pid=record['pid'],
) token=obj,
)
except HTTPError as e:
rprint(
f'[red]ERROR[/red]: deleting record with pid {record["pid"]} failed: {e}: {e.response.text}',
file=sys.stderr,
flush=True,
)
raise
if output is not None: if output is not None:
print(json.dumps(output, ensure_ascii=False)) rprint(json.dumps(output, ensure_ascii=False))
return 0 return 0