Introduce dtc, a single dump-things-client command with subcommands #10

Merged
cmo merged 11 commits from dtc into master 2026-01-22 15:51:59 +00:00
15 changed files with 1046 additions and 538 deletions

View file

@ -1,3 +1,12 @@
# 0.2.0 (2026-01-19)
## New features
- Add `dtc`-command. `dtc` replaces `auto-curate`, `get-records`, `post-records`,
and `read-pages` which are now subcommands of `dtc`. It provides the new
subcommands `list-incoming` and `clean-incoming`.
# 0.1.4 (2026-01-19)
## New features

View file

@ -1,214 +0,0 @@
from __future__ import annotations
import argparse
import json
import logging
import os
import re
import sys
from ..communicate import (
HTTPError,
curated_write_record,
incoming_delete_record,
incoming_read_labels,
incoming_read_records,
)
logger = logging.getLogger('auto-curate')
token_name = 'DUMPTHINGS_TOKEN'
stl_info = False
description=f"""
Automatically move records from the incoming areas of a
collection to the curated area of the same collection, or to
the curated area of another collection.
The environment variable "{token_name}" must contain a token
which used to authenticate the requests. The token must have
curator-rights.
"""
def _main():
argument_parser = argparse.ArgumentParser(
description=description,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
argument_parser.add_argument('service_url', metavar='SOURCE_SERVICE_URL')
argument_parser.add_argument('collection', metavar='SOURCE_COLLECTION')
argument_parser.add_argument(
'--destination-service-url',
default=None,
metavar='DEST_SERVICE_URL',
help='select a different dump-thing-service, i.e. not SOURCE_SERVICE_URL, as destination for auto-curated records',
)
argument_parser.add_argument(
'--destination-collection',
default=None,
metavar='DEST_COLLECTION',
help='select a different collection, i.e. not the SOURCE_COLLECTION of SOURCE_SERVICE_URL, as destination for auto-curated records',
),
argument_parser.add_argument(
'--destination-token',
default=None,
metavar='DEST_TOKEN',
help='if provided, this token will be used for the destination service, otherwise ${CURATOR_TOKEN} will be used',
)
argument_parser.add_argument(
'-e', '--exclude',
action='append',
default=[],
help='exclude an inbox on the source collection (repeatable)',
)
argument_parser.add_argument(
'-i', '--include',
action='append',
default=[],
help='process only the given inbox, all other inboxes are ignored (repeatable, -e/--exclude is applied after inclusion)',
)
argument_parser.add_argument(
'-l', '--list-labels',
action='store_true',
help='list the inbox labels of the given source collection, do not perform any curation',
)
argument_parser.add_argument(
'-r', '--list-records',
action='store_true',
help='list records in the inboxes of the given source collection, do not perform any curation',
)
argument_parser.add_argument(
'-p', '--pid',
action='append',
help='if provided, process only records that match the given PIDs. NOTE: matching does not involve CURIE-resolution!',
)
argument_parser.add_argument(
'-d', '--dry-run',
action='store_true',
help='if provided, do not alter any data, instead print what would be done',
)
arguments = argument_parser.parse_args()
curator_token = os.environ.get(token_name)
if curator_token is None:
print(f'ERROR: environment variable "{token_name}" not set', file=sys.stderr, flush=True)
return 1
destination_url = arguments.destination_service_url or arguments.service_url
destination_collection = arguments.destination_collection or arguments.collection
destination_token = arguments.destination_token or curator_token
output = None
# If --list-labels and --list-records are provided, keep only the latter,
# because it includes listing of labels
if arguments.list_records:
if arguments.list_labels:
print('WARNING: `-l/--list-labels` and `-r/--list-records` defined, ignoring `-l/--list-labels`', file=sys.stderr, flush=True)
arguments.list_labels = False
output = {}
if arguments.list_labels:
output = []
for label in incoming_read_labels(
service_url=arguments.service_url,
collection=arguments.collection,
token=curator_token):
if arguments.include and label not in arguments.include:
logger.debug('ignoring non-included incoming label: %s', label)
continue
if label in arguments.exclude:
logger.debug('ignoring excluded incoming label: %s', label)
continue
if arguments.list_labels:
output.append(label)
continue
if arguments.list_records:
output[label] = []
for record, _, _, _, _ in incoming_read_records(
service_url=arguments.service_url,
collection=arguments.collection,
label=label,
token=curator_token):
if arguments.pid:
if record['pid'] not in arguments.pid:
logger.debug(
'ignoring record with non-matching pid: %s',
record['pid'])
continue
if arguments.list_records:
output[label].append(record)
continue
# Get the class name from the `schema_type` attribute. This requires
# that the schema type is either stored in the record or that the
# store has a "Schema Type Layer", i.e., the store type is
# `record_dir+stl`, or `sqlite+stl`.
try:
class_name = re.search('([_A-Za-z0-9]*$)', record['schema_type']).group(0)
except (IndexError, KeyError):
global stl_info
if not stl_info:
print(
f"""Could not find `schema_type` attribute in record with
pid {record['pid']}. Please ensure that `schema_type` is stored in
the records or that the associated incoming area store has a backend
with a "Schema Type Layer", i.e., "record_dir+stl" or
"sqlite+stl".""",
file=sys.stderr,
flush=True)
stl_info = True
print(
f'WARNING: ignoring record with pid {record["pid"]}, `schema_type` attribute is missing.',
file=sys.stderr,
flush=True)
continue
if arguments.dry_run:
print(f'WRITE record "{record["pid"]}" of class "{class_name}" to "{destination_collection}@{destination_url}"')
print(f'DELETE record "{record["pid"]}" from inbox "{label}" of "{arguments.collection}@{arguments.service_url}"')
continue
# Store record in destination collection
curated_write_record(
service_url=destination_url,
collection=destination_collection,
class_name=class_name,
record=record,
token=destination_token)
# Delete record from incoming area
incoming_delete_record(
service_url=arguments.service_url,
collection=arguments.collection,
label=label,
pid=record['pid'],
token=curator_token,
)
if output is not None:
print(json.dumps(output, ensure_ascii=False))
return 0
def main():
try:
return _main()
except HTTPError as e:
print(f'ERROR: {e}: {e.response.text}', file=sys.stderr, flush=True)
return 1
if __name__ == '__main__':
sys.exit(main())

View file

@ -0,0 +1,60 @@
import logging
import importlib
import pkgutil
from pathlib import Path
import click
dtc_plugins_dir = Path(__file__).parent / 'dtc_plugins'
# This will add a stream handler
logging.basicConfig(level=logging.WARNING)
def load_subcommands(group):
"""Load all sub-command plugins and register them with the group"""
for module_info in pkgutil.iter_modules([dtc_plugins_dir]):
try:
module = importlib.import_module(
'.' + module_info.name,
package='dump_things_pyclient.commands.dtc_plugins',
)
except:
logging.exception('failed to load plugin module %s', module_info)
exit(1)
# get the plugin attributes
plugin_cli = getattr(module, 'subcommand_name', None)
command_name = getattr(module, 'subcommand_name', None)
# skip non-plugin files
if plugin_cli is None or command_name is None:
continue
group.add_command(cmd=getattr(module, 'cli'), name=command_name)
@click.group()
@click.option('--token', envvar='DTC_TOKEN', default=None, help='provide a token on the command line, NOTE: on multiuser systems you should use the environment variable DTC_TOKEN instead')
@click.option('--debug', envvar='DTC_DEBUG', default=False, is_flag=True, help='show debug output')
@click.pass_context
def cli(ctx, token: str, debug: bool):
initialize_logging(debug)
ctx.obj = token
def initialize_logging(debug: bool):
logging.basicConfig(
level=logging.DEBUG if debug else logging.INFO,
force=True,
)
# Load all command plugins from submodule .dtc_plugins`.
load_subcommands(cli)
if __name__ == '__main__':
cli()

View file

@ -0,0 +1,24 @@
"""Subcommands for dtc
Each module implements a subcommand. To add a new subcommand, add a
module with the following attributes:
- `cli`: a `click.command`
- `subcommand_name`: the name of the subcommand
The following example shows the implementation of the subcommand `demo`
```python
import click
@click.command()
@click.pass_obj
def cli(obj):
click.echo(f'demo with custom object: {obj}')
subcommand_name = 'demo'
```
The parameter `obj` will contain a token --if given by the user-- or `None`.
"""

View file

@ -0,0 +1,248 @@
import json
import logging
import re
import sys
import click
from ...communicate import (
HTTPError,
curated_write_record,
incoming_delete_record,
incoming_read_labels,
incoming_read_records,
)
logger = logging.getLogger('auto-curate')
stl_info = False
@click.command(short_help='Move records from inbox to curate area of a collection')
@click.pass_obj
@click.argument(
'service_url',
metavar='SERVICE_URL',
)
@click.argument(
'collection',
metavar='COLLECTION',
)
@click.option(
'--destination-service-url',
metavar='DEST_SERVICE_URL',
help='select a different dump-thing-service, i.e. not SERVICE_URL, as destination for auto-curated records',
)
@click.option(
'--destination-collection',
metavar='DEST_COLLECTION',
help='select a different collection, i.e. not the COLLECTION of SERVICE_URL, as destination for auto-curated records',
)
@click.option(
'--destination-token',
metavar='DEST_TOKEN',
help='if provided, this token will be used the authenticate again DEST_SERVICE_URL, otherwise the token for SERVICE_URL will be used',
)
@click.option(
'--pid', '-p',
metavar='PID',
help='if provided, process only records that match the given PIDs. NOTE: matching does not involve CURIE-resolution',
)
@click.option(
'--exclude', '-e',
help='exclude an inbox on the source collection (repeatable)',
multiple=True,
)
@click.option(
'--include', '-i',
help='process only the given inbox, all other inboxes are ignored (repeatable, -e/--exclude is applied after inclusion)',
multiple=True,
)
@click.option(
'--list-labels', '-l',
help='list the inbox labels of the given source collection, do not perform any curation',
default=False,
is_flag=True,
)
@click.option(
'--list-records', '-r',
help='list records in the inboxes of the given source collection, do not perform any curation',
default=False,
is_flag=True,
)
@click.option(
'--dry-run', '-d',
help='if provided, do not alter any data, instead print what would be done',
default=False,
is_flag=True,
)
def cli(
obj,
service_url,
collection,
destination_service_url,
destination_collection,
destination_token,
pid,
exclude,
include,
list_labels,
list_records,
dry_run,
):
"""Automatically move records from the incoming areas of the collection
COLLECTION in the service SERVICE_URL to the curated area of the same
collection, or to the curated area of another collection, possibly on
another service.
A token is required and will be used to authenticate the requests.
The token must have curator-rights."""
try:
return auto_curate(
obj,
service_url,
collection,
destination_service_url,
destination_collection,
destination_token,
pid,
exclude,
include,
list_labels,
list_records,
dry_run,
)
except HTTPError as e:
print(f'ERROR: {e}: {e.response.text}', file=sys.stderr, flush=True)
return 1
def auto_curate(
obj,
service_url,
collection,
destination_service_url,
destination_collection,
destination_token,
pid,
exclude,
include,
list_labels,
list_records,
dry_run,
):
curator_token = obj
if curator_token is None:
print(
f'ERROR: no token was provided (use --token or DTC_TOKEN environment variable)',
file=sys.stderr,
flush=True,
)
return 1
click.echo(f'auto curate: {obj}')
output = None
# If --list-labels and --list-records are provided, keep only the latter,
# because it includes listing of labels
if list_records:
if list_labels:
logger.warning('`-l/--list-labels` and `-r/--list-records` defined, ignoring `-l/--list-labels`')
list_labels = False
output = {}
if list_labels:
output = []
for label in incoming_read_labels(
service_url=service_url,
collection=collection,
token=obj,
):
if include and label not in include:
logger.debug('ignoring non-included incoming label: %s', label)
continue
if label in exclude:
logger.debug('ignoring excluded incoming label: %s', label)
continue
if list_labels:
output.append(label)
continue
if list_records:
output[label] = []
for record, _, _, _, _ in incoming_read_records(
service_url=service_url,
collection=collection,
label=label,
token=obj,
):
if list_records:
output[label].append(record)
continue
if pid:
if record['pid'] not in pid:
logger.debug(
'ignoring record with non-matching pid: %s',
record['pid'])
continue
# Get the class name from the `schema_type` attribute. This requires
# that the schema type is either stored in the record or that the
# store has a "Schema Type Layer", i.e., the store type is
# `record_dir+stl`, or `sqlite+stl`.
try:
class_name = re.search('([_A-Za-z0-9]*$)', record['schema_type']).group(0)
except (IndexError, KeyError):
global stl_info
if not stl_info:
logger.warning(
f"""Could not find `schema_type` attribute in record with
pid {record['pid']}. Please ensure that `schema_type` is stored in
the records or that the associated incoming area store has a backend
with a "Schema Type Layer", i.e., "record_dir+stl" or
"sqlite+stl".""",
)
stl_info = True
else:
logger.warning(f'ignoring record with pid {record["pid"]}, `schema_type` attribute is missing.')
continue
if dry_run:
print(f'WRITE record "{record["pid"]}" of class "{class_name}" to "{destination_collection}@{destination_service_url}"')
print(f'DELETE record "{record["pid"]}" from inbox "{label}" of "{collection}@{service_url}"')
continue
# Store record in destination collection
curated_write_record(
service_url=destination_service_url,
collection=destination_collection,
class_name=class_name,
record=record,
token=destination_token)
# Delete record from incoming area
incoming_delete_record(
service_url=service_url,
collection=collection,
label=label,
pid=record['pid'],
token=obj,
)
if output is not None:
print(json.dumps(output, ensure_ascii=False))
return 0
subcommand_name = 'auto-curate'

View file

@ -0,0 +1,93 @@
import json
import click
from ...communicate import (
HTTPError,
incoming_delete_record,
incoming_read_records,
)
subcommand_name = 'clean-incoming'
@click.command(short_help='List inboxes of a dump-things collection')
@click.pass_obj
@click.argument(
'service_url',
metavar='SERVICE_URL',
)
@click.argument(
'collection',
metavar='COLLECTION',
)
@click.argument(
'inbox_label',
metavar='INBOX_LABEL',
)
@click.option(
'--list-only', '-l',
default=False,
is_flag=True,
help='only list records in the inbox, do not remove them',
)
def cli(
obj,
service_url,
collection,
inbox_label,
list_only,
):
"""Remove all records from an incoming areas of a collection on a dump-things-service
This command removes all records from the inbox with label INBOX_LABEL in
the collection COLLECTION on the dump-things service given by SERVICE_URL.
A token with curator rights has to be provided.
"""
try:
return clean_incoming(
obj,
service_url,
collection,
inbox_label,
list_only,
)
except HTTPError as e:
click.echo(f'ERROR: {e}: {e.response.text}', err=True)
return 1
def clean_incoming(
obj,
service_url,
collection,
inbox_label,
list_only,
):
token = obj
if token is None:
click.echo('ERROR: token not provided', err=True)
return 1
for record, _, _, _, _ in incoming_read_records(
service_url=service_url,
collection=collection,
label=inbox_label,
token=token,
):
if list_only:
click.echo(json.dumps(record, ensure_ascii=False))
continue
# Delete record from incoming area
incoming_delete_record(
service_url=service_url,
collection=collection,
label=inbox_label,
pid=record['pid'],
token=token,
)
return 0

View file

@ -0,0 +1,262 @@
import json
from functools import partial
import click
from ...communicate import (
HTTPError,
collection_read_records,
collection_read_record_with_pid,
collection_read_records_of_class,
curated_read_records,
curated_read_records_of_class,
curated_read_record_with_pid,
incoming_read_labels,
incoming_read_records,
incoming_read_records_of_class,
incoming_read_record_with_pid,
)
subcommand_name = 'get-records'
@click.command(short_help='Get records from a dump-things collection')
@click.pass_obj
@click.argument(
'service_url',
metavar='SERVICE_URL',
)
@click.argument(
'collection',
metavar='COLLECTION',
)
@click.option(
'--class', '-C', 'cls',
default=False,
is_flag=True,
help='only read records of this class, ignored if "--pid" is provided',
)
@click.option(
'--format', '-f', 'format_',
type=click.Choice(('json', 'ttl'), case_sensitive=False),
default='json',
help='request records in a specific format. (NOTE: not all endpoints support the "format"-parameter)',
)
@click.option(
'--pid', '-p',
help='the pid of the record that should be read',
)
@click.option(
'--incoming', '-i',
metavar='LABEL',
help='read from the collection inbox with label LABEL, if LABEL is "-", return labels of all collection inboxes',
)
@click.option(
'--curated', '-c',
default=False,
is_flag=True,
help='read from the curated area of the collection. (Note: requires a token with curator rights)',
)
@click.option(
'--matching', '-m',
default=False,
is_flag=True,
help='return only records that have a matching value (use % as wildcard). Ignored if "--pid" is provided. (Note: not all endpoints and backends support matching)',
)
@click.option(
'--page-size', '-s',
type=click.IntRange(1, 100),
default=100,
help='set the page size (default: 100). (ignored if "--pid" is provided)'
)
@click.option(
'--first-page', '-F',
type=click.INT,
default=1,
help='the first page to return (default: 1). (ignored if "--pid" is provided)'
)
@click.option(
'--last-page', '-l',
type=click.INT,
help='the last page to return, if not given, all pages will be returned. (ignored if "--pid" is provided)',
default=None,
)
@click.option(
'--stats',
default=False,
is_flag=True,
help='show the number of records and pages and exit. (ignored if "--pid" is provided)',
)
@click.option(
'--pagination', '-P',
default=False,
is_flag=True,
help='show pagination information (each record from an paginated endpoint is returned as [<record>, <current page number>, <total number of pages>, <page size>, <total number of items>]. (ignored if "--pid" is provided)',
)
def cli(
obj,
service_url,
collection,
cls,
format_,
pid,
incoming,
curated,
matching,
page_size,
first_page,
last_page,
stats,
pagination,
):
"""Get records from a collection on a dump-things-service
This command lists records that are stored in collection COLLECTION of the
dump-things service SERVICE_URL. By
default, all records that are readable with the given token, or the default
token, will be displayed. The output format is JSONL (JSON lines), where
every line contains a record or a record with paging information. If `ttl`
is chosen as format of the output records, the record content will be a string
that contains a TTL-documents.
The command supports reading from the curated area only, reading from incoming
areas, or reading a record with a given PID.
Pagination information is returned for paginated results, when requested with
`-P/--pagination`. All results are paginated except "get a record with a given PID"
and "get the list of incoming zone labels".
For reading from curated or incoming areas, a token with curator rights has
to be provided.
"""
try:
return get_records(
obj,
service_url,
collection,
cls,
format_,
pid,
incoming,
curated,
matching,
page_size,
first_page,
last_page,
stats,
pagination,
)
except HTTPError as e:
click.echo(f'ERROR: {e}: {e.response.text}', err=True)
return 1
def get_records(
obj,
service_url,
collection,
cls,
format_,
pid,
incoming,
curated,
matching,
page_size,
first_page,
last_page,
stats,
pagination,
):
token = obj
if token is None:
click.echo(f'WARNING: no token provided', err=True)
if incoming and curated:
click.echo(
'ERROR: -i/--incoming and -c/--curated are mutually exclusive',
err=True,
)
return 1
kwargs = dict(
service_url=service_url,
collection=collection,
token=token,
)
if incoming == '-':
result = incoming_read_labels(**kwargs)
print('\n'.join(
map(
partial(json.dumps, ensure_ascii=False),
result)))
return 0
elif pid:
for argument_value, argument_name in (
(matching, '-m/--matching'),
(page_size, '-s/--page_size'),
(first_page, '-f/--first_page'),
(last_page, '-l/--last_page'),
(stats, '--stats'),
(cls, '-c/--class'),
):
if argument_value:
click.echo(
f'WARNING: {argument_name} ignored because "-p/--pid" is provided',
err=True,
)
kwargs['pid'] = pid
if curated:
result = curated_read_record_with_pid(**kwargs)
elif incoming:
kwargs['label'] = incoming
result = incoming_read_record_with_pid(**kwargs)
else:
kwargs['format'] = format_
result = collection_read_record_with_pid(**kwargs)
print(json.dumps(result, ensure_ascii=False))
return 0
elif cls:
kwargs.update(dict(
class_name=cls,
matching=matching,
page=first_page,
size=page_size,
last_page=last_page,
))
if curated:
result = curated_read_records_of_class(**kwargs)
elif incoming:
kwargs['label'] = incoming
result = incoming_read_records_of_class(**kwargs)
else:
kwargs['format'] = format_
result = collection_read_records_of_class(**kwargs)
else:
kwargs.update(dict(
matching=matching,
page=first_page,
size=page_size or 100,
last_page=last_page,
))
if curated:
result = curated_read_records(**kwargs)
elif incoming:
kwargs['label'] = incoming
result = incoming_read_records(**kwargs)
else:
kwargs['format'] = format_
result = collection_read_records(**kwargs)
if pagination:
for record in result:
print(json.dumps(record, ensure_ascii=False))
else:
for record in result:
print(json.dumps(record[0], ensure_ascii=False))
return 0

View file

@ -0,0 +1,86 @@
import json
import click
from ...communicate import (
HTTPError,
incoming_read_labels,
incoming_read_records,
)
subcommand_name = 'list-incoming'
@click.command(short_help='List inboxes of a dump-things collection')
@click.pass_obj
@click.argument(
'service_url',
metavar='SERVICE_URL',
)
@click.argument(
'collection',
metavar='COLLECTION',
)
@click.option(
'--show-records', '-s',
default=False,
is_flag=True,
help='list records in inboxes',
)
def cli(
obj,
service_url,
collection,
show_records,
):
"""List labels of incoming areas of a collection on a dump-things-service
This command lists the labels of the incoming areas of the collection
COLLECTION on the dump-things service given by SERVICE_URL.
A token with curator rights has to be provided.
"""
try:
return list_incoming(
obj,
service_url,
collection,
show_records,
)
except HTTPError as e:
click.echo(f'ERROR: {e}: {e.response.text}', err=True)
return 1
def list_incoming(
obj,
service_url,
collection,
show_records,
):
token = obj
if token is None:
click.echo('ERROR: token not provided', err=True)
return 1
result = {}
for label in incoming_read_labels(
service_url=service_url,
collection=collection,
token=token,
):
result[label] = []
if show_records:
for record, _, _, _, _ in incoming_read_records(
service_url=service_url,
collection=collection,
label=label,
token=token,
):
result[label].append(record)
if show_records is False:
result = list(result)
click.echo(json.dumps(result, indent=2, ensure_ascii=False))
return 0

View file

@ -0,0 +1,116 @@
import json
import logging
import sys
import click
from ...communicate import (
HTTPError,
curated_write_record,
collection_write_record,
)
logger = logging.getLogger('post-records')
@click.command(short_help='Post records to an inbox or the curated area of a dump-things collection')
@click.pass_obj
@click.argument(
'service_url',
metavar='SERVICE_URL',
)
@click.argument(
'collection',
metavar='COLLECTION',
)
@click.argument(
'cls',
metavar='CLASS',
)
@click.option(
'--curated',
default=False,
is_flag=True,
help='store record directly in curated area instead of an inbox. (Note: requires a token with curator rights)'
)
def cli(
obj,
service_url,
collection,
cls,
curated,
):
"""Read records of class CLASS from standard input and store them in
the collection COLLECTION on the service SERVICE_URL. Records should be
provided in JSON-lines format. Note: all records are assumed to be of class
CLASS. To submit records of multiple classes, the subcommand has to be
invoked multiple times, once for each class.
If the `--curated`-option is provided, the records will be stored directly
in the curated area of the collection without any alterations, i.e, no
annotations will be added.
If no `--curated`-option is provided, the record will be stored in the
inbox of the user that is associated with the token, and the record will be
annotated with the submission time and the user that performed
the submission.
A token is required and will be used to authenticate the requests.
If the `--curated`-option is provided, the token must have
curator-rights."""
try:
return post_records(
obj,
service_url,
collection,
cls,
curated,
)
except HTTPError as e:
click.echo(f'ERROR: {e}: {e.response.text}', err=True)
return 1
def post_records(
obj,
service_url,
collection,
cls,
curated,
):
token = obj
if token is None:
click.echo('ERROR: no token provided', err=True)
return 1
if curated:
write_record = curated_write_record
else:
write_record = collection_write_record
posted = False
for line in sys.stdin:
record = json.loads(line)
try:
write_record(
service_url=service_url,
collection=collection,
class_name=cls,
record=record,
token=token,
)
except Exception as e:
click.echo(f'ERROR: {e}', err=True)
else:
posted = True
click.echo('.', nl=False)
if posted:
# echo a final newline
click.echo('')
return 0
subcommand_name = 'post-records'

View file

@ -0,0 +1,141 @@
import json
import logging
import click
from ...communicate import (
HTTPError,
get_paginated,
)
logger = logging.getLogger('read-pages')
@click.command(short_help='Read records from paginated dump-things endpoints')
@click.pass_obj
@click.argument(
'url',
metavar='URL',
)
@click.option(
'--page-size', '-s',
type=click.INT,
default=100,
help='set the page size (1 - 100) (default: 100)'
)
@click.option(
'--first-page', '-F',
type=click.INT,
default=1,
help='the first page to return (default: 1)'
)
@click.option(
'--last-page', '-l',
type=click.INT,
help='the last page to return (default: None (return all pages)',
)
@click.option(
'--stats',
is_flag=True,
default=False,
help='show information about the number of records and pages and exit, the format is is returned as [<total number of pages>, <page size>, <total number of items>]',
)
@click.option(
'--format', '-f', 'format_',
type=click.Choice(('json', 'ttl'), case_sensitive=False),
default='json',
help='request output records in a specific format. (NOTE: not all endpoints support the "format"-parameter)',
)
@click.option(
'--matching', '-m',
help='return only records that have a matching value (use % as wildcard). (NOTE: not all endpoints and storage-backends support matching.)',
)
@click.option(
'--pagination', '-P',
is_flag=True,
help='show pagination information (each record from an paginated endpoint is returned as [<record>, <current page number>, <total number of pages>, <page size>, <total number of items>]',
)
def cli(
obj,
url,
page_size,
first_page,
last_page,
stats,
format_,
matching,
pagination,
):
"""Read paginated endpoint
This command lists all records that are available via a paginated endpoints from
a dump-things-service, e.g., given by URL
https://<service-location>/<collection>/records/p/
"""
try:
return read_pages(
obj,
url,
page_size,
first_page,
last_page,
stats,
format_,
matching,
pagination,
)
except HTTPError as e:
click.echo(f'ERROR: {e}: {e.response.text}', err=True)
return 1
def read_pages(
obj,
url,
page_size,
first_page,
last_page,
stats,
format_,
matching,
pagination,
):
token = obj
if token is None:
click.echo(f'WARNING: no token provided', err=True)
result = get_paginated(
url=url,
token=token,
first_page=first_page,
page_size=page_size,
last_page=last_page,
parameters={
'format': format_,
**(
{'matching': matching}
if matching is not None
else {}
),
}
)
if stats:
record = next(result)
click.echo(json.dumps(record[2:], ensure_ascii=False))
return 0
if pagination:
for record in result:
click.echo(json.dumps(record, ensure_ascii=False))
else:
for record in result:
click.echo(json.dumps(record[0], ensure_ascii=False))
return 0
subcommand_name = 'read-pages'

View file

@ -1,171 +0,0 @@
from __future__ import annotations
import argparse
import json
import os
import sys
from functools import partial
from ..communicate import (
HTTPError,
collection_read_records,
collection_read_records_of_class,
collection_read_record_with_pid,
curated_read_records,
curated_read_records_of_class,
curated_read_record_with_pid,
incoming_read_labels,
incoming_read_records,
incoming_read_records_of_class,
incoming_read_record_with_pid,
)
token_name = 'DUMPTHINGS_TOKEN'
description = f"""Get records from a collection on a dump-things-service
This command lists records that are stored in a dump-things-service. By
default all records that are readable with the given token, or the default
token, will be displayed. The output format is JSONL (JSON lines), where
every line contains a record or a record with paging information. If `ttl`
is chosen as format of the output records, the record content will be a string
that contains a TTL-documents.
The command supports to read from the curated area only, to read from incoming
areas, or to read records with a given PID.
Pagination information is returned for paginated results, when requested with
`-P/--pagination`. All results are paginated except "get a record with a given PID"
and "get the list of incoming zone labels".
If the environment variable "{token_name}" is set, its content will be used
as token to authenticate against the dump-things-service.
"""
def _main():
argument_parser = argparse.ArgumentParser(
description=description,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
argument_parser.add_argument('service_url')
argument_parser.add_argument('collection')
argument_parser.add_argument('-C', '--class', dest='class_name', help='only read records of this class, ignored if "--pid" is provided')
argument_parser.add_argument('-f', '--format', help='format of the output records ("json" or "ttl")')
argument_parser.add_argument('-p', '--pid', help='the pid of the record that should be read')
argument_parser.add_argument('-i', '--incoming', metavar='LABEL', help='read from incoming area with the given label in the collection, if LABEL is "-", return the labels')
argument_parser.add_argument('-c', '--curated', action='store_true', help='read from the curated area of the collection')
argument_parser.add_argument('-m', '--matching', help='return only records that have a matching value (use % as wildcard). Ignored if "--pid" is provided. (NOTE: not all endpoints and backends support matching.)')
argument_parser.add_argument('-s', '--page-size', type=int, help='set the page size (1 - 100) (default: 100), ignored if "--pid" is provided')
argument_parser.add_argument('-F', '--first-page', type=int, help='the first page to return (default: 1), ignored if "--pid" is provided')
argument_parser.add_argument('-l', '--last-page', type=int, default=None, help='the last page to return (default: None (return all pages), ignored if "--pid" is provided')
argument_parser.add_argument('--stats', action='store_true', help='show the number of records and pages and exit, ignored if "--pid" is provided')
argument_parser.add_argument('-P', '--pagination', action='store_true', help='show pagination information (each record from an paginated endpoint is returned as [<record>, <current page number>, <total number of pages>, <page size>, <total number of items>]')
arguments = argument_parser.parse_args()
token = os.environ.get(token_name)
if token is None:
print(f'WARNING: {token_name} not set', file=sys.stderr, flush=True)
if arguments.incoming and arguments.curated:
print(
'ERROR: -i/--incoming and -c/--curated are mutually exclusive',
file=sys.stderr,
flush=True)
return 1
kwargs = dict(
service_url=arguments.service_url,
collection=arguments.collection,
token=token,
)
if arguments.incoming == '-':
result = incoming_read_labels(**kwargs)
print('\n'.join(
map(
partial(json.dumps, ensure_ascii=False),
result)))
return 0
elif arguments.pid:
for argument_value, argument_name in (
(arguments.matching, '-m/--matching'),
(arguments.page_size, '-s/--page_size'),
(arguments.first_page, '-f/--first_page'),
(arguments.last_page, '-l/--last_page'),
(arguments.stats, '--stats'),
(arguments.class_name, '-c/--class'),
):
if argument_value:
print(
f'WARNING: {argument_name} ignored because "-p/--pid" is provided',
file=sys.stderr,
flush=True)
kwargs['pid'] = arguments.pid
if arguments.curated:
result = curated_read_record_with_pid(**kwargs)
elif arguments.incoming:
kwargs['label'] = arguments.incoming
result = incoming_read_record_with_pid(**kwargs)
else:
kwargs['format'] = arguments.format
result = collection_read_record_with_pid(**kwargs)
print(json.dumps(result, ensure_ascii=False))
return 0
elif arguments.class_name:
kwargs.update(dict(
class_name=arguments.class_name,
matching=arguments.matching,
page=arguments.first_page or 1,
size=arguments.page_size or 100,
last_page=arguments.last_page,
))
if arguments.curated:
result = curated_read_records_of_class(**kwargs)
elif arguments.incoming:
kwargs['label'] = arguments.incoming
result = incoming_read_records_of_class(**kwargs)
else:
kwargs['format'] = arguments.format
result = collection_read_records_of_class(**kwargs)
else:
kwargs.update(dict(
matching=arguments.matching,
page=arguments.first_page or 1,
size=arguments.page_size or 100,
last_page=arguments.last_page,
))
if arguments.curated:
result = curated_read_records(**kwargs)
elif arguments.incoming:
kwargs['label'] = arguments.incoming
result = incoming_read_records(**kwargs)
else:
kwargs['format'] = arguments.format
result = collection_read_records(**kwargs)
if arguments.pagination:
for record in result:
print(json.dumps(record, ensure_ascii=False))
else:
for record in result:
print(json.dumps(record[0], ensure_ascii=False))
return 0
def main():
try:
return _main()
except HTTPError as e:
print(f'ERROR: {e}: {e.response.text}', file=sys.stderr, flush=True)
return 1
if __name__ == '__main__':
sys.exit(main())

View file

@ -1,59 +0,0 @@
from __future__ import annotations
import argparse
import json
import os
import sys
from ..communicate import (
collection_write_record,
curated_write_record,
)
def main():
argument_parser = argparse.ArgumentParser()
argument_parser.add_argument('base_url')
argument_parser.add_argument('collection')
argument_parser.add_argument('cls', metavar='class')
argument_parser.add_argument('--curated', action='store_true', help='bypass inbox, requires curator token')
arguments = argument_parser.parse_args()
token = os.environ.get('DUMPTHINGS_TOKEN')
if token is None:
print(
'WARNING: environment variable DUMPTHINGS_TOKEN not set',
file=sys.stderr,
flush=True,
)
if arguments.curated:
write_record = curated_write_record
else:
write_record = collection_write_record
posted = False
for line in sys.stdin:
record = json.loads(line)
try:
write_record(
service_url=arguments.base_url,
collection=arguments.collection,
class_name=arguments.cls,
record=record,
token=token,
)
except Exception as e:
print(f'Error: {e}', file=sys.stderr, flush=True)
else:
posted = True
print('.', end='', flush=True)
if posted:
# final newline
print('')
if __name__ == '__main__':
sys.exit(main())

View file

@ -1,87 +0,0 @@
from __future__ import annotations
import argparse
import json
import os
import sys
from ..communicate import (
HTTPError,
get_paginated,
)
token_name = 'DUMPTHINGS_TOKEN'
description = f"""Read paginated endpoint
This command lists all records that are available via paginated endpoints from
a dump-things-service, e.g., from:
https://<service-location>/<collection>/records/p/
If the environment variable "{token_name}" is set, its content will be used
as token to authenticate against the dump-things-service.
"""
def _main():
argument_parser = argparse.ArgumentParser(
description=description,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
argument_parser.add_argument('url', help='url of the paginated endpoint of the dump-things-service')
argument_parser.add_argument('-s', '--page-size', type=int, default=100, help='set the page size (1 - 100) (default: 100)')
argument_parser.add_argument('-F', '--first-page', type=int, default=1, help='the first page to return (default: 1)')
argument_parser.add_argument('-l', '--last-page', type=int, default=None, help='the last page to return (default: None (return all pages)')
argument_parser.add_argument('--stats', action='store_true', help='show information about the number of records and pages and exit, the format is is returned as [<total number of pages>, <page size>, <total number of items>]')
argument_parser.add_argument('-f', '--format', help='format of the output records ("json" or "ttl"). (NOTE: not all endpoints support the format parameter.)')
argument_parser.add_argument('-m', '--matching', help='return only records that have a matching value (use % as wildcard). (NOTE: not all endpoints and backends support matching.)')
argument_parser.add_argument('-P', '--pagination', action='store_true', help='show pagination information (each record from an paginated endpoint is returned as [<record>, <current page number>, <total number of pages>, <page size>, <total number of items>]')
arguments = argument_parser.parse_args()
token = os.environ.get(token_name)
if token is None:
print(f'WARNING: {token_name} not set', file=sys.stderr, flush=True)
result = get_paginated(
url=arguments.url,
token=token,
first_page=arguments.first_page,
page_size=arguments.page_size,
last_page=arguments.last_page,
parameters={
'format': arguments.format,
**({'matching': arguments.matching}
if arguments.matching is not None
else {}
),
}
)
if arguments.stats:
record = next(result)
print(json.dumps(record[2:], ensure_ascii=False))
return 0
if arguments.pagination:
for record in result:
print(json.dumps(record, ensure_ascii=False))
else:
for record in result:
print(json.dumps(record[0], ensure_ascii=False))
return 0
def main():
try:
return _main()
except HTTPError as e:
print(f'ERROR: {e}: {e.response.text}', file=sys.stderr, flush=True)
return 1
if __name__ == '__main__':
sys.exit(main())

View file

@ -1,13 +1,14 @@
[project]
name = "dump-things-pyclient"
version = "0.1.4"
description = "A client library and some CLI command for dump-things-services"
version = "0.2.0"
description = "A client library and CLI commands for dump-things-services"
readme = "README.md"
requires-python = ">=3.11"
authors = [
{name="Christian Mönch", email="christian.moench@web.de"},
]
dependencies = [
"click>=8.3.1",
"requests>=2.32.5",
]
@ -20,8 +21,5 @@ tests = [
]
[project.scripts]
auto-curate = "dump_things_pyclient.commands.auto_curate:main"
read-pages = "dump_things_pyclient.commands.read_pages:main"
get-records = "dump_things_pyclient.commands.get_records:main"
dtc = "dump_things_pyclient.commands.dtc:cli"
json2ttl = "dump_things_pyclient.commands.json2ttl:main"
post-records = "dump_things_pyclient.commands.post_records:main"

4
uv.lock generated
View file

@ -369,9 +369,10 @@ wheels = [
[[package]]
name = "dump-things-pyclient"
version = "0.1.3"
version = "0.1.4"
source = { virtual = "." }
dependencies = [
{ name = "click" },
{ name = "requests" },
]
@ -385,6 +386,7 @@ ttl = [
[package.metadata]
requires-dist = [
{ name = "click", specifier = ">=8.3.1" },
{ name = "dump-things-service", marker = "extra == 'ttl'", specifier = ">=5.3.0" },
{ name = "pytest", marker = "extra == 'tests'", specifier = ">=9.0.1" },
{ name = "requests", specifier = ">=2.32.5" },