105 lines
3.4 KiB
Python
105 lines
3.4 KiB
Python
import random
|
|
from pathlib import Path
|
|
from typing import Generator
|
|
|
|
import yaml
|
|
from sqlalchemy import union
|
|
|
|
from dump_things_pyclient.communicate import collection_write_record
|
|
|
|
# This prefix is used for the PIDs of all automatically created test records
|
|
default_prefix = 'https://x.y.z/test/'
|
|
|
|
|
|
def read_records_from_store(
|
|
store: Path,
|
|
collection: str = 'collection_1',
|
|
incoming: str | None = None,
|
|
class_name: str = '*',
|
|
remove_keys: list | None = None,
|
|
) -> Generator[tuple[Path, str | None, dict], None, None]:
|
|
"""Read record from a dumpthings-"recorddir" backend
|
|
|
|
By default records are read from the curated area. If records should
|
|
be read from the incoming area, set `incoming` to a glob expression that
|
|
matches the user IDs for which records should be read. Use '*' for all
|
|
user IDs (the value of `incoming` is used a glob-expression).
|
|
|
|
By default all classes are returned because the glob expression '*' is
|
|
used for the classes. If `class_name` is set, the value will be used as
|
|
glob expression for classes. For example, `*Document` would match
|
|
`XYZDocument` and `Document`.
|
|
|
|
If `remove_keys` is not `None`, all keys that are specified in `remove_keys`
|
|
will be removed from the records before they are returned. For example,
|
|
remove annotations by specifying `['annotations']`.
|
|
|
|
Returns tuples of (record_path, user_id or None, cleaned record content)
|
|
"""
|
|
|
|
config = yaml.safe_load((store / '.dumpthings.yaml').read_text())
|
|
curated_dir = config['collections'][collection]['curated']
|
|
incoming_dir = config['collections'][collection]['incoming']
|
|
|
|
glob_expression = f'{class_name}/**/*.yaml'
|
|
if incoming:
|
|
base_dir = store / incoming_dir
|
|
glob_expression = f'{incoming}/' + glob_expression
|
|
else:
|
|
base_dir = store / curated_dir
|
|
|
|
for record_path in base_dir.glob(glob_expression):
|
|
if record_path.name == '.dumpthings.yaml':
|
|
continue
|
|
if incoming:
|
|
base_parts = base_dir.parts
|
|
user_id = record_path.parts[len(base_parts)]
|
|
else:
|
|
user_id = None
|
|
record = yaml.safe_load(record_path.read_text())
|
|
if remove_keys:
|
|
yield (
|
|
record_path,
|
|
user_id,
|
|
{
|
|
k: v for k, v in record.items()
|
|
if k not in remove_keys
|
|
},
|
|
)
|
|
else:
|
|
yield (record_path, user_id, record)
|
|
|
|
|
|
def add_unique_records(
|
|
port: int,
|
|
collection: str,
|
|
number_of_entries: int,
|
|
unifying_string: str,
|
|
token: str | None = None,
|
|
) -> dict:
|
|
|
|
records = _create_unique_records(number_of_entries, unifying_string)
|
|
for record in records.values():
|
|
collection_write_record(
|
|
service_url=f'http://127.0.0.1:{port}/',
|
|
collection=collection,
|
|
class_name='Person',
|
|
record=record,
|
|
token=token,
|
|
)
|
|
return records
|
|
|
|
|
|
def _create_unique_records(
|
|
number_of_records: int,
|
|
unifying_string: str,
|
|
) -> dict[int, dict]:
|
|
return {
|
|
pid: {
|
|
'schema_type': 'test:Person',
|
|
'pid': default_prefix + f'person_{unifying_string}_{pid}',
|
|
'family_name': f'grieg_{unifying_string}_{pid}',
|
|
'given_name': f'erwin_{unifying_string}_{pid}',
|
|
}
|
|
for pid in range(number_of_records)
|
|
}
|