dump-things-pyclient/dump_things_pyclient/tests/common.py
Christian Monch 80af17bb04
All checks were successful
Test execution / Test-all (push) Successful in 32s
add --keep-inboxes option to dtc auto-curate
2026-03-24 11:43:03 +01:00

105 lines
3.4 KiB
Python

import random
from pathlib import Path
from typing import Generator
import yaml
from sqlalchemy import union
from dump_things_pyclient.communicate import collection_write_record
# This prefix is used for the PIDs of all automatically created test records
default_prefix = 'https://x.y.z/test/'
def read_records_from_store(
store: Path,
collection: str = 'collection_1',
incoming: str | None = None,
class_name: str = '*',
remove_keys: list | None = None,
) -> Generator[tuple[Path, str | None, dict], None, None]:
"""Read record from a dumpthings-"recorddir" backend
By default records are read from the curated area. If records should
be read from the incoming area, set `incoming` to a glob expression that
matches the user IDs for which records should be read. Use '*' for all
user IDs (the value of `incoming` is used a glob-expression).
By default all classes are returned because the glob expression '*' is
used for the classes. If `class_name` is set, the value will be used as
glob expression for classes. For example, `*Document` would match
`XYZDocument` and `Document`.
If `remove_keys` is not `None`, all keys that are specified in `remove_keys`
will be removed from the records before they are returned. For example,
remove annotations by specifying `['annotations']`.
Returns tuples of (record_path, user_id or None, cleaned record content)
"""
config = yaml.safe_load((store / '.dumpthings.yaml').read_text())
curated_dir = config['collections'][collection]['curated']
incoming_dir = config['collections'][collection]['incoming']
glob_expression = f'{class_name}/**/*.yaml'
if incoming:
base_dir = store / incoming_dir
glob_expression = f'{incoming}/' + glob_expression
else:
base_dir = store / curated_dir
for record_path in base_dir.glob(glob_expression):
if record_path.name == '.dumpthings.yaml':
continue
if incoming:
base_parts = base_dir.parts
user_id = record_path.parts[len(base_parts)]
else:
user_id = None
record = yaml.safe_load(record_path.read_text())
if remove_keys:
yield (
record_path,
user_id,
{
k: v for k, v in record.items()
if k not in remove_keys
},
)
else:
yield (record_path, user_id, record)
def add_unique_records(
port: int,
collection: str,
number_of_entries: int,
unifying_string: str,
token: str | None = None,
) -> dict:
records = _create_unique_records(number_of_entries, unifying_string)
for record in records.values():
collection_write_record(
service_url=f'http://127.0.0.1:{port}/',
collection=collection,
class_name='Person',
record=record,
token=token,
)
return records
def _create_unique_records(
number_of_records: int,
unifying_string: str,
) -> dict[int, dict]:
return {
pid: {
'schema_type': 'test:Person',
'pid': default_prefix + f'person_{unifying_string}_{pid}',
'family_name': f'grieg_{unifying_string}_{pid}',
'given_name': f'erwin_{unifying_string}_{pid}',
}
for pid in range(number_of_records)
}