dump-things-server/dump_things_service/utils.py
Christian Monch 16a1a4b29a ensure that record-dir config files are created
If a record-dir backend is created on a directory,
check if the config file exists, if not write it.
2026-06-12 11:48:15 +02:00

507 lines
16 KiB
Python

"""
To speed up processing, multiple indices could be introduced, e.g.:
- token representation -> token name
"""
from __future__ import annotations
import logging
import sys
from contextlib import contextmanager
from functools import reduce
from typing import (
TYPE_CHECKING,
Callable,
)
import fsspec
from fastapi import HTTPException
from rdflib import Graph
from dump_things_service import (
HTTP_400_BAD_REQUEST,
HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN,
HTTP_413_CONTENT_TOO_LARGE,
HTTP_503_SERVICE_UNAVAILABLE,
)
from dump_things_service.abstract_config import (
Configuration,
RecordDirBackendConfig,
TokenModes,
TokenPermission,
mode_mapping,
check_collection,
get_default_token_config,
get_token_config_for_representation_and_collection,
get_mapping_function_by_name,
)
from dump_things_service.auth import (
AuthenticationError,
AuthenticationInfo,
)
if TYPE_CHECKING:
from pathlib import Path
from dump_things_service import JSON
from dump_things_service.backends.record_dir import _RecordDirStore
from dump_things_service.backends.sqlite import _SQLiteBackend
from dump_things_service.instance_state import InstanceState
from dump_things_service.store.model_store import _ModelStore
logger = logging.getLogger('dump_things_service')
@contextmanager
def sys_path(paths: list[str | Path]):
"""Patch the `Path` class to return the paths in `paths` in order."""
original_path = sys.path
try:
sys.path = [str(path) for path in paths]
yield
finally:
sys.path = original_path
def read_url(url: str) -> str:
"""
Read the content of a URL into memory.
"""
open_file = fsspec.open(url, 'rt')
with open_file as f:
return f.read()
def cleaned_json(data: JSON, remove_keys: tuple[str, ...] = ('@type',)) -> JSON:
if isinstance(data, list):
return [cleaned_json(item, remove_keys) for item in data]
if isinstance(data, dict):
return {
key: cleaned_json(value, remove_keys)
for key, value in data.items()
if key not in remove_keys and data[key] is not None
}
return data
def combine_ttl(documents: list[str]) -> str:
graphs = [Graph().parse(data=doc, format='ttl') for doc in documents]
return reduce(lambda g1, g2: g1 + g2, graphs).serialize(format='ttl')
@contextmanager
def wrap_http_exception(
exception_class: type[BaseException] = ValueError,
status_code: int = HTTP_400_BAD_REQUEST,
header: str = ''
):
"""Wrap exceptions of class `exception_class` into HTTP exceptions"""
try:
yield
except exception_class as e:
raise HTTPException(
status_code=status_code,
detail=f'{header}: {e}' if header else str(e),
) from e
def join_default_token_permissions(
abstract_configuration: Configuration,
instance_state: InstanceState,
permissions: TokenPermission,
collection: str,
) -> TokenPermission:
result = permissions.model_copy()
# Get the default token name. If a default token is not defined, return
# token permissions without any right. A collection might define a default
# token that does not yet exist. We allow this inconsistency to decouple
# token and collection creation, i.e. to allow to create a collection first
# and a token later.
default_token_name = abstract_configuration.collections[collection].default_token
if default_token_name not in abstract_configuration.tokens:
return result
# We allow inconsistencies in token/collection configuration space. This
# allows an administrator to create tokens and collections in two separate
# steps. Therefore, we have to check whether the referred default token
# is actually defined for the collection.
if collection not in abstract_configuration.tokens[default_token_name].collections:
return result
default_token_mode = abstract_configuration.tokens[default_token_name].collections[collection].mode
default_token_permissions = mode_mapping[TokenModes(default_token_mode)]
result.curated_read = (
permissions.curated_read | default_token_permissions.curated_read
)
result.incoming_read = (
permissions.incoming_read | default_token_permissions.incoming_read
)
result.incoming_write = (
permissions.incoming_write | default_token_permissions.incoming_write
)
return result
def get_on_disk_labels(
store_path: Path,
abstract_config: Configuration,
collection: str,
) -> set[str]:
check_collection(abstract_config, collection)
incoming_path = (
store_path / abstract_config.collections[collection].incoming
)
if not incoming_path or not incoming_path.exists():
return set()
return {
path.name
for path in incoming_path.iterdir()
if path.is_dir()
}
def authenticate_token(
instance_state: InstanceState,
collection_name: str,
token_representation: str,
) -> AuthenticationInfo:
# Try to authenticate the token with the authentication providers that
# are associated with the collection.
auth_info = None
messages = []
for auth_source in instance_state.auth_sources[collection_name]:
try:
logger.debug('trying to authenticate with %s', auth_source)
auth_info = auth_source.authenticate(token_representation)
break
except AuthenticationError as ae:
logger.debug(
'Authentication provider %s could not '
'authenticate token for collection %s: %s',
auth_source,
collection_name,
str(ae),
)
messages.append(f'{auth_source.__class__.__name__} failed with: {ae}')
continue
if not auth_info:
detail = f'invalid token for collection {collection_name}: ' + ', '.join(
messages,
)
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail=detail,
)
return auth_info
def get_token_store(
abstract_config: Configuration,
instance_state: InstanceState,
collection_name: str,
token_representation: str,
) -> tuple[_ModelStore, TokenPermission, str] | tuple[None, None, None, None]:
# Try to authenticate the token with the authentication providers that
# are associated with the collection.
auth_info = authenticate_token(
instance_state,
collection_name,
token_representation,
)
permissions = auth_info.token_permission
# If the token has no incoming-read or incoming-write permissions, we do not
# need to create a store.
if not permissions.incoming_read and not permissions.incoming_write:
instance_state.incoming_stores[collection_name][token_representation] = (
None,
permissions,
auth_info.user_id,
)
return instance_state.incoming_stores[collection_name][token_representation]
# Check whether the collection has an incoming definition
incoming = abstract_config.collections[collection_name].incoming
if not incoming:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail='No incoming area for collection ' + collection_name
)
# Check whether a store for this collection and token does already exist.
store_info = instance_state.incoming_stores[collection_name].get(token_representation)
if store_info:
return store_info
store_dir = instance_state.store_path / incoming / auth_info.incoming_label
token_store = create_token_store(
abstract_configuration=abstract_config,
instance_state=instance_state,
collection_name=collection_name,
store_dir=store_dir,
)
instance_state.incoming_stores[collection_name][token_representation] = (
token_store,
permissions,
auth_info.user_id,
)
return instance_state.incoming_stores[collection_name][token_representation]
def create_store(
abstract_configuration: Configuration,
instance_state: InstanceState,
collection_name: str,
) -> _ModelStore:
collection_curated_path = abstract_configuration.collections[collection_name].curated
return create_token_store(
abstract_configuration=abstract_configuration,
instance_state=instance_state,
collection_name=collection_name,
store_dir=instance_state.store_path / collection_curated_path,
)
def create_token_store(
abstract_configuration: Configuration,
instance_state: InstanceState,
collection_name: str,
store_dir: Path,
) -> _ModelStore:
from dump_things_service.backends.schema_type_layer import SchemaTypeLayer
from dump_things_service.abstract_config import get_backend_and_extension
from dump_things_service.exceptions import ConfigError
from dump_things_service.store.model_store import ModelStore
# One early requirement for the service was to be able to specify
# arbitrary directories for curated stores and incoming stores. This
# explicitly included the use case where an incoming store and a
# curated store are identical. This has the following consequences:
#
# 1. Any collection might have multiple incoming stores that use the same
# directory as the curated store.
#
# 2. Multiple collections might share curated or incoming directories with
# other stores.
#
# From 1. follows that, for efficiency and consistency reasons, existing
# backends for a directory should be reused. With 2. one has to check that
# the collections that specify the backend have matching schemas. Schemas
# must match if the same backend, i.e., the same directory and basic backend
# type (basic backend types are `record_dir` or `sqlite`) are used.
# If different backend types are used (which is possible in the same
# directory), the schemas could in principle be different.
store_dir.mkdir(parents=True, exist_ok=True)
schema_uri = abstract_configuration.collections[collection_name].schema_location
# We get the backend information from the abstract configuration
backend_config = abstract_configuration.collections[collection_name].backend
backend_name, extension = get_backend_and_extension(backend_config.type)
if backend_name == 'record_dir':
backend = create_record_dir_token_store_backend(
store_dir=store_dir,
order_by=instance_state.order_by,
schema_uri=schema_uri,
mapping_function=backend_config.mapping_method,
suffix='yaml',
)
elif backend_name == 'sqlite':
backend = create_sqlite_token_store_backend(
store_dir=store_dir,
order_by=instance_state.order_by,
)
else:
# This should not happen because we base our decision on already
# existing backends.
msg = f'Unsupported backend type: `{backend_name}`.'
raise ConfigError(msg)
if extension == 'stl':
backend = SchemaTypeLayer(backend=backend, schema=schema_uri)
submission_tags = abstract_configuration.collections[collection_name].submission_tags
return ModelStore(
schema=schema_uri,
backend=backend,
tags={
'id': submission_tags.submitter_id_tag,
'time': submission_tags.submission_time_tag,
},
)
def create_record_dir_token_store_backend(
store_dir: Path,
order_by: list[str],
schema_uri: str,
mapping_function: str,
suffix: str,
) -> _RecordDirStore:
from dump_things_service.instance_state import record_dir_config_file_name
from dump_things_service.backends.record_dir import RecordDirStore
# Write the configuration to the store, if it does not yet exist.
if not (store_dir / record_dir_config_file_name).exists():
write_record_dir_config(
path=store_dir,
mapping_function=mapping_function,
schema=schema_uri,
)
store_backend = RecordDirStore(
root=store_dir,
pid_mapping_function=get_mapping_function_by_name(mapping_function),
suffix=suffix,
order_by=order_by,
)
store_backend.build_index_if_needed(schema=schema_uri)
return store_backend
def write_record_dir_config(
path: Path,
mapping_function: str,
schema: str,
):
from dump_things_service.instance_state import record_dir_config_file_name
record_dir_config_file_path = path / record_dir_config_file_name
if not record_dir_config_file_path.exists():
record_dir_config_file_path.write_text(f"""# RecordDir Config
type: records
version: 1
schema: {schema}
format: yaml
idfx: {mapping_function}
""",
)
def create_sqlite_token_store_backend(
store_dir: Path,
order_by: list[str],
) -> _SQLiteBackend:
from dump_things_service.backends.sqlite import SQLiteBackend
from dump_things_service.backends.sqlite import (
record_file_name as sqlite_record_file_name,
)
return SQLiteBackend(
db_path=store_dir / sqlite_record_file_name,
order_by=order_by,
)
def check_bounds(
length: int | None,
max_length: int,
collection: str,
alternative_url: str
):
if length > max_length:
raise HTTPException(
status_code=HTTP_413_CONTENT_TOO_LARGE,
detail=f"Too many records found in collection '{collection}'. "
f'Please use pagination (/{collection}{alternative_url}).',
)
async def process_token(
abstract_config: Configuration,
instance_state: InstanceState,
api_key: str | None,
collection: str,
) -> tuple[TokenPermission, _ModelStore]:
if api_key is None:
token_config = get_default_token_config(abstract_config, collection)
else:
token_elements = get_token_config_for_representation_and_collection(
abstract_config,
collection_name=collection,
token_representation=api_key,
)
token_config = token_elements[1] if token_elements else None
if not token_config:
detail = f"invalid token for collection '{collection}'"
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail=detail,
)
token_store, token_permissions, user_id = get_token_store(
abstract_config,
instance_state,
collection,
token_config.representation,
)
final_permissions = join_default_token_permissions(
abstract_config, instance_state, token_permissions, collection
)
# Check for maintenance mode
if collection in instance_state.maintenance_mode:
if not (
final_permissions.curated_read
and final_permissions.curated_write
and final_permissions.zones_access
):
raise HTTPException(
status_code=HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Collection '{collection}' is in maintenance mode",
)
if not final_permissions.incoming_read and not final_permissions.curated_read:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,
detail=f"No read access to curated or incoming data in collection '{collection}'.",
)
return final_permissions, token_store
def get_required_incoming_labels(
abstract_config: Configuration,
collection_name: str,
) -> set[str]:
return set(
map(
lambda x: x[1],
get_required_incoming_info(abstract_config, collection_name),
)
)
def get_required_incoming_info(
abstract_config: Configuration,
collection_name: str,
) -> set[tuple[str, str]]:
return {
(token_name, this_collection_info.incoming_label)
for token_name, token_info in abstract_config.tokens.items()
for this_collection_name, this_collection_info in token_info.collections.items()
if this_collection_name == collection_name and mode_mapping[
TokenModes(this_collection_info.mode)
].incoming_write is True
}
def var_escape(
name: str,
) -> str:
return name.replace('_', '___').replace('-', '_0_')