Add progress output, remove token logging #22
1 changed files with 92 additions and 41 deletions
|
|
@ -2,8 +2,19 @@ import json
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
|
from itertools import (
|
||||||
|
count,
|
||||||
|
chain,
|
||||||
|
)
|
||||||
|
from typing import (
|
||||||
|
cast,
|
||||||
|
Iterable,
|
||||||
|
)
|
||||||
|
|
||||||
import rich_click as click
|
import rich_click as click
|
||||||
|
from rich import print as rprint
|
||||||
|
from rich.console import Console
|
||||||
|
from rich.progress import track
|
||||||
|
|
||||||
from ...communicate import (
|
from ...communicate import (
|
||||||
HTTPError,
|
HTTPError,
|
||||||
|
|
@ -114,7 +125,11 @@ def cli(
|
||||||
dry_run,
|
dry_run,
|
||||||
)
|
)
|
||||||
except HTTPError as e:
|
except HTTPError as e:
|
||||||
print(f'ERROR: {e}: {e.response.text}', file=sys.stderr, flush=True)
|
rprint(
|
||||||
|
f'[red]Error[/red]: {e}: {e.response.text}',
|
||||||
|
file=sys.stderr,
|
||||||
|
flush=True,
|
||||||
|
)
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -135,15 +150,13 @@ def auto_curate(
|
||||||
curator_token = obj
|
curator_token = obj
|
||||||
|
|
||||||
if curator_token is None:
|
if curator_token is None:
|
||||||
print(
|
rprint(
|
||||||
f'ERROR: no token was provided (use --token or DTC_TOKEN environment variable)',
|
f'[red]Error[/red]: no token was provided (use --token or DTC_TOKEN environment variable)',
|
||||||
file=sys.stderr,
|
file=sys.stderr,
|
||||||
flush=True,
|
flush=True,
|
||||||
)
|
)
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
click.echo(f'auto curate: {obj}')
|
|
||||||
|
|
||||||
output = None
|
output = None
|
||||||
|
|
||||||
# If --list-labels and --list-records are provided, keep only the latter,
|
# If --list-labels and --list-records are provided, keep only the latter,
|
||||||
|
|
@ -157,12 +170,12 @@ def auto_curate(
|
||||||
if list_labels:
|
if list_labels:
|
||||||
output = []
|
output = []
|
||||||
|
|
||||||
for label in incoming_read_labels(
|
all_labels = incoming_read_labels(
|
||||||
service_url=service_url,
|
service_url=service_url,
|
||||||
collection=collection,
|
collection=collection,
|
||||||
token=obj,
|
token=obj,
|
||||||
):
|
)
|
||||||
|
for label in all_labels:
|
||||||
if include and label not in include:
|
if include and label not in include:
|
||||||
logger.debug('ignoring non-included incoming label: %s', label)
|
logger.debug('ignoring non-included incoming label: %s', label)
|
||||||
continue
|
continue
|
||||||
|
|
@ -178,13 +191,38 @@ def auto_curate(
|
||||||
if list_records:
|
if list_records:
|
||||||
output[label] = []
|
output[label] = []
|
||||||
|
|
||||||
for record, _, _, _, _ in incoming_read_records(
|
# Get the total number of entries for the
|
||||||
|
record_source = incoming_read_records(
|
||||||
service_url=service_url,
|
service_url=service_url,
|
||||||
collection=collection,
|
collection=collection,
|
||||||
label=label,
|
label=label,
|
||||||
token=obj,
|
token=obj,
|
||||||
):
|
)
|
||||||
|
|
||||||
|
# Get the first entry to find the total number of records
|
||||||
|
try:
|
||||||
|
first_record, _, _, _, total = next(record_source)
|
||||||
|
except StopIteration:
|
||||||
|
rprint(
|
||||||
|
f'no records in incoming area [green]{label}[/green], skipping it',
|
||||||
|
file=sys.stderr,
|
||||||
|
flush=True,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Get the first entry an all other entries
|
||||||
|
for index, (record, _, _, _, total) in track(
|
||||||
|
zip(
|
||||||
|
count(),
|
||||||
|
chain(
|
||||||
|
[(first_record, None, None, None, None)],
|
||||||
|
cast(Iterable, record_source),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
description=f'processing [green]{label}[/green]',
|
||||||
|
total=total,
|
||||||
|
console=Console(file=sys.stderr),
|
||||||
|
):
|
||||||
if list_records:
|
if list_records:
|
||||||
output[label].append(record)
|
output[label].append(record)
|
||||||
continue
|
continue
|
||||||
|
|
@ -204,33 +242,39 @@ def auto_curate(
|
||||||
class_name = re.search('([_A-Za-z0-9]*$)', record['schema_type']).group(0)
|
class_name = re.search('([_A-Za-z0-9]*$)', record['schema_type']).group(0)
|
||||||
except (IndexError, KeyError):
|
except (IndexError, KeyError):
|
||||||
global stl_info
|
global stl_info
|
||||||
|
rprint(f'[yellow]Warning[/yellow]: ignoring record with pid {record["pid"]} because `schema_type` attribute is missing.')
|
||||||
if not stl_info:
|
if not stl_info:
|
||||||
logger.warning(
|
print(
|
||||||
f"""Could not find `schema_type` attribute in record with
|
' Please ensure that `schema_type` is stored in the records '
|
||||||
pid {record['pid']}. Please ensure that `schema_type` is stored in
|
'or that the associated incoming area store has a backend with a '
|
||||||
the records or that the associated incoming area store has a backend
|
'"Schema Type Layer", i.e., "record_dir+stl" or "sqlite+stl"."',
|
||||||
with a "Schema Type Layer", i.e., "record_dir+stl" or
|
|
||||||
"sqlite+stl".""",
|
|
||||||
)
|
)
|
||||||
stl_info = True
|
stl_info = True
|
||||||
else:
|
|
||||||
logger.warning(f'ignoring record with pid {record["pid"]}, `schema_type` attribute is missing.')
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if dry_run:
|
if dry_run:
|
||||||
print(f'WRITE record "{record["pid"]}" of class "{class_name}" to "{destination_collection}@{destination_service_url}"')
|
click.echo(f'WRITE record "{record["pid"]}" of class "{class_name}" to "{destination_collection}@{destination_service_url}"')
|
||||||
print(f'DELETE record "{record["pid"]}" from inbox "{label}" of "{collection}@{service_url}"')
|
click.echo(f'DELETE record "{record["pid"]}" from inbox "{label}" of "{collection}@{service_url}"')
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Store record in destination collection
|
# Store record in destination collection
|
||||||
|
try:
|
||||||
curated_write_record(
|
curated_write_record(
|
||||||
service_url=destination_service_url,
|
service_url=destination_service_url,
|
||||||
collection=destination_collection,
|
collection=destination_collection,
|
||||||
class_name=class_name,
|
class_name=class_name,
|
||||||
record=record,
|
record=record,
|
||||||
token=destination_token)
|
token=destination_token)
|
||||||
|
except HTTPError as e:
|
||||||
|
rprint(
|
||||||
|
f'[red]Error[/red]: writing record with pid {record["pid"]} failed: {e}: {e.response.text}',
|
||||||
|
file=sys.stderr,
|
||||||
|
flush=True,
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
# Delete record from incoming area
|
# Delete record from incoming area
|
||||||
|
try:
|
||||||
incoming_delete_record(
|
incoming_delete_record(
|
||||||
service_url=service_url,
|
service_url=service_url,
|
||||||
collection=collection,
|
collection=collection,
|
||||||
|
|
@ -238,9 +282,16 @@ def auto_curate(
|
||||||
pid=record['pid'],
|
pid=record['pid'],
|
||||||
token=obj,
|
token=obj,
|
||||||
)
|
)
|
||||||
|
except HTTPError as e:
|
||||||
|
rprint(
|
||||||
|
f'[red]ERROR[/red]: deleting record with pid {record["pid"]} failed: {e}: {e.response.text}',
|
||||||
|
file=sys.stderr,
|
||||||
|
flush=True,
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
if output is not None:
|
if output is not None:
|
||||||
print(json.dumps(output, ensure_ascii=False))
|
rprint(json.dumps(output, ensure_ascii=False))
|
||||||
|
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue