122 lines
3.6 KiB
Python
122 lines
3.6 KiB
Python
from __future__ import annotations
|
|
|
|
import sys
|
|
from argparse import ArgumentParser
|
|
from collections.abc import Iterable
|
|
from pathlib import Path
|
|
|
|
from dump_things_service import config_file_name
|
|
from dump_things_service.abstract_config import read_config
|
|
from dump_things_service.backends.schema_type_layer import _SchemaTypeLayer
|
|
from dump_things_service.backends.sqlite import _SQLiteBackend
|
|
from dump_things_service.config import get_config, process_config
|
|
from dump_things_service.exceptions import CurieResolutionError
|
|
from dump_things_service.store.model_store import _ModelStore
|
|
from dump_things_service.utils import (
|
|
create_token_store,
|
|
get_config_labels,
|
|
get_on_disk_labels,
|
|
)
|
|
|
|
parser = ArgumentParser(
|
|
prog='Check pids for resolvability',
|
|
description='This command checks for pids that are in CURIE format and '
|
|
'cannot be resolved.',
|
|
)
|
|
parser.add_argument(
|
|
'store',
|
|
help='The root directory of the store.',
|
|
)
|
|
parser.add_argument(
|
|
'-c',
|
|
'--config',
|
|
metavar='CONFIG_FILE',
|
|
help="Read the configuration from 'CONFIG_FILE' instead of looking for "
|
|
'it in the root directory of the store.',
|
|
)
|
|
|
|
|
|
def show_backend(model_store: _ModelStore):
|
|
backend = model_store.backend
|
|
if isinstance(backend, _SchemaTypeLayer):
|
|
backend = backend.backend
|
|
if isinstance(backend, _SQLiteBackend):
|
|
print(f'Checking: {backend.db_path}', file=sys.stderr)
|
|
else:
|
|
print(f'Checking: {backend.root}', file=sys.stderr)
|
|
|
|
|
|
def check_pids_in_stores(
|
|
stores: Iterable[_ModelStore]
|
|
) -> int:
|
|
result = 0
|
|
for store in stores:
|
|
print('checking', store.get_uri(), file=sys.stderr)
|
|
for record_info in store.get_all_objects():
|
|
pid = record_info.json_object['pid']
|
|
try:
|
|
store.pid_to_iri(pid)
|
|
except CurieResolutionError:
|
|
result += 1
|
|
print(pid, store.get_uri())
|
|
|
|
return result
|
|
|
|
|
|
def check_pids():
|
|
|
|
instance_config = get_config()
|
|
abstract_config = read_config(instance_config.store_path)
|
|
|
|
result = 0
|
|
|
|
# Check pids in curated stores
|
|
result += check_pids_in_stores(instance_config.curated_stores.values())
|
|
|
|
# Check pids in incoming stores. Incoming stores can be defined in the
|
|
# configuration, or can be generated by external authentication sources.
|
|
# In the latter case, they are manifest as directories in the incoming area
|
|
# of a collection.
|
|
for collection, collection_info in instance_config.collections.items():
|
|
|
|
configured_labels = get_config_labels(instance_config, collection)
|
|
on_disk_labels = get_on_disk_labels(
|
|
store_path=instance_config.store_path,
|
|
abstract_config=abstract_config,
|
|
collection=collection,
|
|
)
|
|
all_labels = configured_labels.union(on_disk_labels)
|
|
|
|
token_stores = [
|
|
create_token_store(
|
|
instance_config,
|
|
collection,
|
|
instance_config.store_path / collection_info.incoming / label
|
|
)
|
|
for label in all_labels
|
|
]
|
|
result += check_pids_in_stores(token_stores)
|
|
|
|
return result
|
|
|
|
|
|
def main():
|
|
arguments = parser.parse_args()
|
|
|
|
store_path = Path(arguments.store).absolute()
|
|
process_config(
|
|
store_path=store_path,
|
|
config_file=Path(arguments.config or (store_path / config_file_name)),
|
|
order_by=['pid'],
|
|
globals_dict=globals(),
|
|
)
|
|
|
|
result = check_pids()
|
|
if result > 0:
|
|
print(f'found {result} unresolvable pids', file=sys.stderr)
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|