dump-things-server/dump_things_service/utils.py
2026-02-02 08:32:28 +01:00

475 lines
15 KiB
Python

from __future__ import annotations
import logging
import sys
from contextlib import contextmanager
from functools import reduce
from typing import (
TYPE_CHECKING,
Callable,
)
import fsspec
from fastapi import HTTPException
from rdflib import Graph
from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR
from dump_things_service import (
HTTP_400_BAD_REQUEST,
HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN,
HTTP_404_NOT_FOUND,
HTTP_413_CONTENT_TOO_LARGE,
HTTP_503_SERVICE_UNAVAILABLE,
)
from dump_things_service.auth import (
AuthenticationError,
AuthenticationInfo,
)
from dump_things_service.token import (
TokenPermission,
get_token_parts,
)
if TYPE_CHECKING:
from pathlib import Path
from dump_things_service import JSON
from dump_things_service.backends.record_dir import RecordDirStore
from dump_things_service.backends.sqlite import SQLiteBackend
from dump_things_service.config import InstanceConfig
from dump_things_service.store.model_store import ModelStore
logger = logging.getLogger('dump_things_service')
@contextmanager
def sys_path(paths: list[str | Path]):
"""Patch the `Path` class to return the paths in `paths` in order."""
original_path = sys.path
try:
sys.path = [str(path) for path in paths]
yield
finally:
sys.path = original_path
def read_url(url: str) -> str:
"""
Read the content of an URL into memory.
"""
open_file = fsspec.open(url, 'rt')
with open_file as f:
return f.read()
def cleaned_json(data: JSON, remove_keys: tuple[str, ...] = ('@type',)) -> JSON:
if isinstance(data, list):
return [cleaned_json(item, remove_keys) for item in data]
if isinstance(data, dict):
return {
key: cleaned_json(value, remove_keys)
for key, value in data.items()
if key not in remove_keys and data[key] is not None
}
return data
def combine_ttl(documents: list[str]) -> str:
graphs = [Graph().parse(data=doc, format='ttl') for doc in documents]
return reduce(lambda g1, g2: g1 + g2, graphs).serialize(format='ttl')
def get_schema_type_curie(
instance_config: InstanceConfig,
collection: str,
class_name: str,
) -> str:
schema_url = instance_config.schemas[collection]
schema_module = instance_config.conversion_objects[schema_url]['schema_module']
class_object = getattr(schema_module, class_name)
return class_object.class_class_curie
@contextmanager
def wrap_http_exception(
exception_class: type[BaseException] = ValueError,
status_code: int = HTTP_400_BAD_REQUEST,
header: str = ''
):
"""Wrap exceptions of class `exception_class` into HTTP exceptions"""
try:
yield
except exception_class as e:
raise HTTPException(
status_code=status_code,
detail=f'{header}: {e}' if header else str(e),
) from e
def join_default_token_permissions(
instance_config: InstanceConfig,
permissions: TokenPermission,
collection: str,
) -> TokenPermission:
default_token_name = instance_config.collections[collection].default_token
default_token_permissions = instance_config.tokens[collection][default_token_name]['permissions']
result = TokenPermission()
result.curated_read = (
permissions.curated_read | default_token_permissions.curated_read
)
result.incoming_read = (
permissions.incoming_read | default_token_permissions.incoming_read
)
result.incoming_write = (
permissions.incoming_write | default_token_permissions.incoming_write
)
return result
def check_collection(
instance_config: InstanceConfig,
collection: str,
):
if collection not in instance_config.collections:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"No such collection: '{collection}'.",
)
def check_label(
instance_config: InstanceConfig,
collection: str,
label: str,
):
# Get the on-disk labels for the collection
if (
label not in get_config_labels(instance_config, collection)
and label not in get_on_disk_labels(instance_config, collection)
):
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"No incoming label: '{label}' in collection: '{collection}'.",
)
def get_config_labels(
instance_config: InstanceConfig,
collection: str,
) -> set[str]:
check_collection(instance_config, collection)
return {
token['incoming_label']
for token in instance_config.tokens[collection].values()
if token['incoming_label'] != ''
}
def get_on_disk_labels(
instance_config: InstanceConfig,
collection: str,
) -> set[str]:
check_collection(instance_config, collection)
incoming_path = (
instance_config.store_path
/ instance_config.collections[collection].incoming
)
if not incoming_path or not incoming_path.exists():
return set()
return {
path.name
for path in incoming_path.iterdir()
if path.is_dir()
}
def get_default_token_name(
instance_config: InstanceConfig,
collection: str
) -> str:
check_collection(instance_config, collection)
return instance_config.collections[collection].default_token
async def process_token(
instance_config: InstanceConfig,
api_key: str,
collection: str,
) -> tuple[TokenPermission, ModelStore]:
token = (
get_default_token_name(instance_config, collection)
if api_key is None
else api_key
)
token_store, token, token_permissions, _ = get_token_store(
instance_config,
collection,
token,
)
final_permissions = join_default_token_permissions(
instance_config, token_permissions, collection
)
# Check for maintenance mode
if collection in instance_config.maintenance_mode:
if not (
final_permissions.curated_read
and final_permissions.curated_write
and final_permissions.zones_access
):
raise HTTPException(
status_code=HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Collection '{collection}' is in maintenance mode",
)
if not final_permissions.incoming_read and not final_permissions.curated_read:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,
detail=f"No read access to curated or incoming data in collection '{collection}'.",
)
return final_permissions, token_store
def resolve_hashed_token(
instance_config: InstanceConfig,
collection_name: str,
token: str,
) -> str:
# Check for hashed token and return the hashed token value instead
# of the plain text token value if the token is hashed.
if '-' in token:
return instance_config.hashed_tokens[collection_name].get(
get_token_parts(token)[0],
token,
)
return token
def authenticate_token(
instance_config: InstanceConfig,
collection_name: str,
plain_token: str,
) -> AuthenticationInfo:
# Try to authenticate the token with the authentication providers that
# are associated with the collection.
auth_info = None
messages = []
for auth_provider in instance_config.auth_providers[collection_name]:
try:
logger.debug('trying to authenticate with %s', auth_provider)
auth_info = auth_provider.authenticate(plain_token)
break
except AuthenticationError as ae:
logger.debug(
'Authentication provider %s could not '
'authenticate token for collection %s: %s',
auth_provider,
collection_name,
str(ae),
)
messages.append(f'{auth_provider.__class__.__name__} failed with: {ae}')
continue
if not auth_info:
detail = f'invalid token for collection {collection_name}: ' + ', '.join(
messages,
)
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail=detail,
)
return auth_info
def get_token_store(
instance_config: InstanceConfig,
collection_name: str,
plain_token: str
) -> tuple[ModelStore, str, TokenPermission, str] | tuple[None, None, None, None]:
check_collection(instance_config, collection_name)
# Try to authenticate the token with the authentication providers that
# are associated with the collection.
auth_info = authenticate_token(instance_config, collection_name, plain_token)
permissions = auth_info.token_permission
# If the token is hashed, get the hashed value. This is required because
# we associate token info with the hashed version of the token.
hashed_token = resolve_hashed_token(
instance_config,
collection_name,
plain_token,
)
# If the token has no incoming-read or incoming-write permissions, we do not
# need to create a store.
if not permissions.incoming_read and not permissions.incoming_write:
instance_config.token_stores[collection_name][plain_token] = (
None,
hashed_token,
permissions,
auth_info.user_id,
)
return instance_config.token_stores[collection_name][plain_token]
# Check whether the collection has an incoming definition
incoming = instance_config.incoming.get(collection_name)
if not incoming:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail='No incoming area for collection ' + collection_name
)
# Check whether a store for this collection and token does already exist.
store_info = instance_config.token_stores[collection_name].get(plain_token)
if store_info:
return store_info
store_dir = instance_config.store_path / incoming / auth_info.incoming_label
token_store = create_token_store(
instance_config=instance_config,
collection_name=collection_name,
store_dir=store_dir,
)
instance_config.token_stores[collection_name][plain_token] = (
token_store,
hashed_token,
permissions,
auth_info.user_id,
)
return instance_config.token_stores[collection_name][plain_token]
def create_token_store(
instance_config: InstanceConfig,
collection_name: str,
store_dir: Path,
) -> ModelStore:
from dump_things_service.backends.schema_type_layer import SchemaTypeLayer
from dump_things_service.config import (
ConfigError,
get_backend_and_extension,
)
from dump_things_service.store.model_store import ModelStore
# Check if the store was already created and if it was created for the
# same schema.
if store_dir in instance_config.all_stores:
existing_collection_name, existing_model_store = instance_config.all_stores[store_dir]
if (
existing_collection_name != collection_name
and instance_config.schemas[existing_collection_name] != instance_config.schemas[collection_name]
):
msg = (
f"collections '{existing_collection_name}' and "
f"'{collection_name}' with different schemas map onto the same"
f" storage directory: '<incoming_path>/{store_dir.name}'"
)
raise HTTPException(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
detail=msg,
)
return existing_model_store
store_dir.mkdir(parents=True, exist_ok=True)
schema_uri = instance_config.schemas[collection_name]
# We get the backend information from the curated store
backend_type = instance_config.backend[collection_name].type
backend_name, extension = get_backend_and_extension(backend_type)
backend = instance_config.curated_stores[collection_name].backend
if backend_name == 'record_dir':
# The configuration routines have read the backend configuration of the
# curated store from disk and stored it in `instance_config`. We fetch
# it from there.
if extension == 'stl':
backend = backend.backend
token_store = create_record_dir_token_store(
store_dir=store_dir,
order_by=backend.order_by,
schema_uri=instance_config.schemas[collection_name],
mapping_function=backend.pid_mapping_function,
suffix=backend.suffix,
)
elif backend_name == 'sqlite':
token_store = create_sqlite_token_store(
store_dir=store_dir,
order_by=backend.order_by,
)
else:
# This should not happen because we base our decision on already
# existing backends.
msg = f'Unsupported backend type: `{backend_type}`.'
raise ConfigError(msg)
if extension == 'stl':
token_store = SchemaTypeLayer(backend=token_store, schema=schema_uri)
submission_tags = instance_config.collections[collection_name].submission_tags
tags = {
'id': submission_tags.submitter_id_tag,
'time': submission_tags.submission_time_tag,
}
model_store = ModelStore(backend=token_store, schema=schema_uri, tags=tags)
instance_config.all_stores[store_dir] = (collection_name, model_store)
return model_store
def create_record_dir_token_store(
store_dir: Path,
order_by: list[str],
schema_uri: str,
mapping_function: Callable,
suffix: str,
) -> RecordDirStore:
from dump_things_service.backends.record_dir import RecordDirStore
store_backend = RecordDirStore(
root=store_dir,
pid_mapping_function=mapping_function,
suffix=suffix,
order_by=order_by,
)
store_backend.build_index_if_needed(schema=schema_uri)
return store_backend
def create_sqlite_token_store(
store_dir: Path,
order_by: list[str],
) -> SQLiteBackend:
from dump_things_service.backends.sqlite import SQLiteBackend
from dump_things_service.backends.sqlite import (
record_file_name as sqlite_record_file_name,
)
return SQLiteBackend(
db_path=store_dir / sqlite_record_file_name,
order_by=order_by,
)
def check_bounds(
length: int | None,
max_length: int,
collection: str,
alternative_url: str
):
if length > max_length:
raise HTTPException(
status_code=HTTP_413_CONTENT_TOO_LARGE,
detail=f"Too many records found in collection '{collection}'. "
f'Please use pagination (/{collection}{alternative_url}).',
)