490 lines
15 KiB
Python
490 lines
15 KiB
Python
"""
|
|
|
|
|
|
To speed up processing, multiple indices could be introduced, e.g.:
|
|
|
|
- token representation -> token name
|
|
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import logging
|
|
import sys
|
|
from contextlib import contextmanager
|
|
from functools import reduce
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Callable,
|
|
Iterable,
|
|
)
|
|
|
|
import fsspec
|
|
from fastapi import HTTPException
|
|
from rdflib import Graph
|
|
from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR
|
|
|
|
from dump_things_service import (
|
|
HTTP_400_BAD_REQUEST,
|
|
HTTP_401_UNAUTHORIZED,
|
|
HTTP_403_FORBIDDEN,
|
|
HTTP_404_NOT_FOUND,
|
|
HTTP_413_CONTENT_TOO_LARGE,
|
|
HTTP_503_SERVICE_UNAVAILABLE,
|
|
)
|
|
from dump_things_service.abstract_config import (
|
|
Configuration,
|
|
TokenModes,
|
|
TokenPermission,
|
|
mode_mapping,
|
|
get_default_token_config,
|
|
get_token_config_for_representation_and_collection,
|
|
)
|
|
from dump_things_service.auth import (
|
|
AuthenticationError,
|
|
AuthenticationInfo,
|
|
)
|
|
from dump_things_service.api_token import get_token_parts
|
|
from dump_things_service.abstract_config import check_collection
|
|
|
|
if TYPE_CHECKING:
|
|
from pathlib import Path
|
|
|
|
from dump_things_service import JSON
|
|
from dump_things_service.backends.record_dir import _RecordDirStore
|
|
from dump_things_service.backends.sqlite import SQLiteBackend
|
|
from dump_things_service.instance_state import InstanceState
|
|
from dump_things_service.store.model_store import _ModelStore
|
|
|
|
|
|
logger = logging.getLogger('dump_things_service')
|
|
|
|
|
|
@contextmanager
|
|
def sys_path(paths: list[str | Path]):
|
|
"""Patch the `Path` class to return the paths in `paths` in order."""
|
|
original_path = sys.path
|
|
try:
|
|
sys.path = [str(path) for path in paths]
|
|
yield
|
|
finally:
|
|
sys.path = original_path
|
|
|
|
|
|
def read_url(url: str) -> str:
|
|
"""
|
|
Read the content of an URL into memory.
|
|
"""
|
|
open_file = fsspec.open(url, 'rt')
|
|
with open_file as f:
|
|
return f.read()
|
|
|
|
|
|
def cleaned_json(data: JSON, remove_keys: tuple[str, ...] = ('@type',)) -> JSON:
|
|
if isinstance(data, list):
|
|
return [cleaned_json(item, remove_keys) for item in data]
|
|
if isinstance(data, dict):
|
|
return {
|
|
key: cleaned_json(value, remove_keys)
|
|
for key, value in data.items()
|
|
if key not in remove_keys and data[key] is not None
|
|
}
|
|
return data
|
|
|
|
|
|
def combine_ttl(documents: list[str]) -> str:
|
|
graphs = [Graph().parse(data=doc, format='ttl') for doc in documents]
|
|
return reduce(lambda g1, g2: g1 + g2, graphs).serialize(format='ttl')
|
|
|
|
|
|
def get_schema_type_curie(
|
|
instance_state: InstanceState,
|
|
collection: str,
|
|
class_name: str,
|
|
) -> str:
|
|
schema_url = instance_state.schemas[collection]
|
|
schema_module = instance_state.conversion_objects[schema_url]['schema_module']
|
|
class_object = getattr(schema_module, class_name)
|
|
return class_object.class_class_curie
|
|
|
|
|
|
@contextmanager
|
|
def wrap_http_exception(
|
|
exception_class: type[BaseException] = ValueError,
|
|
status_code: int = HTTP_400_BAD_REQUEST,
|
|
header: str = ''
|
|
):
|
|
"""Wrap exceptions of class `exception_class` into HTTP exceptions"""
|
|
try:
|
|
yield
|
|
except exception_class as e:
|
|
raise HTTPException(
|
|
status_code=status_code,
|
|
detail=f'{header}: {e}' if header else str(e),
|
|
) from e
|
|
|
|
|
|
def join_default_token_permissions(
|
|
abstract_configuration: Configuration,
|
|
instance_state: InstanceState,
|
|
permissions: TokenPermission,
|
|
collection: str,
|
|
) -> TokenPermission:
|
|
|
|
result = TokenPermission()
|
|
|
|
# Get the default token name. If a default token is not defined, return
|
|
# token permissions without any right. A collection might define a default
|
|
# token that does not yet exist. We allow this inconsistency to decouple
|
|
# token and collection creation, i.e. to allow to create a collection first
|
|
# and a token later.
|
|
default_token_name = abstract_configuration.collections[collection].default_token
|
|
if default_token_name not in abstract_configuration.tokens:
|
|
return result
|
|
|
|
default_token_mode = abstract_configuration.tokens[default_token_name].collections[collection].mode
|
|
default_token_permissions = mode_mapping[TokenModes(default_token_mode)]
|
|
result.curated_read = (
|
|
permissions.curated_read | default_token_permissions.curated_read
|
|
)
|
|
result.incoming_read = (
|
|
permissions.incoming_read | default_token_permissions.incoming_read
|
|
)
|
|
result.incoming_write = (
|
|
permissions.incoming_write | default_token_permissions.incoming_write
|
|
)
|
|
return result
|
|
|
|
|
|
def get_on_disk_labels(
|
|
store_path: Path,
|
|
abstract_config: Configuration,
|
|
collection: str,
|
|
) -> set[str]:
|
|
check_collection(abstract_config, collection)
|
|
|
|
incoming_path = (
|
|
store_path / abstract_config.collections[collection].incoming
|
|
)
|
|
if not incoming_path or not incoming_path.exists():
|
|
return set()
|
|
|
|
return {
|
|
path.name
|
|
for path in incoming_path.iterdir()
|
|
if path.is_dir()
|
|
}
|
|
|
|
|
|
def resolve_hashed_token(
|
|
instance_state: InstanceState,
|
|
collection_name: str,
|
|
token: str,
|
|
) -> str:
|
|
|
|
# Check for hashed token and return the hashed token value instead
|
|
# of the plain text token value if the token is hashed.
|
|
if '-' in token:
|
|
return instance_state.hashed_tokens[collection_name].get(
|
|
get_token_parts(token)[0],
|
|
token,
|
|
)
|
|
return token
|
|
|
|
|
|
def authenticate_token(
|
|
instance_state: InstanceState,
|
|
collection_name: str,
|
|
token_representation: str,
|
|
) -> AuthenticationInfo:
|
|
|
|
# Try to authenticate the token with the authentication providers that
|
|
# are associated with the collection.
|
|
auth_info = None
|
|
messages = []
|
|
for auth_source in instance_state.auth_sources[collection_name]:
|
|
try:
|
|
logger.debug('trying to authenticate with %s', auth_source)
|
|
auth_info = auth_source.authenticate(token_representation)
|
|
break
|
|
except AuthenticationError as ae:
|
|
logger.debug(
|
|
'Authentication provider %s could not '
|
|
'authenticate token for collection %s: %s',
|
|
auth_source,
|
|
collection_name,
|
|
str(ae),
|
|
)
|
|
messages.append(f'{auth_source.__class__.__name__} failed with: {ae}')
|
|
continue
|
|
|
|
if not auth_info:
|
|
detail = f'invalid token for collection {collection_name}: ' + ', '.join(
|
|
messages,
|
|
)
|
|
raise HTTPException(
|
|
status_code=HTTP_401_UNAUTHORIZED,
|
|
detail=detail,
|
|
)
|
|
return auth_info
|
|
|
|
|
|
def get_token_store(
|
|
abstract_config: Configuration,
|
|
instance_state: InstanceState,
|
|
collection_name: str,
|
|
token_representation: str,
|
|
) -> tuple[_ModelStore, TokenPermission, str] | tuple[None, None, None, None]:
|
|
|
|
# Try to authenticate the token with the authentication providers that
|
|
# are associated with the collection.
|
|
auth_info = authenticate_token(
|
|
instance_state,
|
|
collection_name,
|
|
token_representation,
|
|
)
|
|
permissions = auth_info.token_permission
|
|
|
|
# If the token has no incoming-read or incoming-write permissions, we do not
|
|
# need to create a store.
|
|
if not permissions.incoming_read and not permissions.incoming_write:
|
|
instance_state.incoming_stores[collection_name][token_representation] = (
|
|
None,
|
|
permissions,
|
|
auth_info.user_id,
|
|
)
|
|
return instance_state.incoming_stores[collection_name][token_representation]
|
|
|
|
# Check whether the collection has an incoming definition
|
|
incoming = abstract_config.collections[collection_name].incoming
|
|
if not incoming:
|
|
raise HTTPException(
|
|
status_code=HTTP_401_UNAUTHORIZED,
|
|
detail='No incoming area for collection ' + collection_name
|
|
)
|
|
|
|
# Check whether a store for this collection and token does already exist.
|
|
store_info = instance_state.incoming_stores[collection_name].get(token_representation)
|
|
if store_info:
|
|
return store_info
|
|
|
|
store_dir = instance_state.store_path / incoming / auth_info.incoming_label
|
|
token_store = create_token_store(
|
|
abstract_configuration=abstract_config,
|
|
instance_state=instance_state,
|
|
collection_name=collection_name,
|
|
store_dir=store_dir,
|
|
)
|
|
|
|
instance_state.incoming_stores[collection_name][token_representation] = (
|
|
token_store,
|
|
permissions,
|
|
auth_info.user_id,
|
|
)
|
|
return instance_state.incoming_stores[collection_name][token_representation]
|
|
|
|
|
|
def create_token_store(
|
|
abstract_configuration: Configuration,
|
|
instance_state: InstanceState,
|
|
collection_name: str,
|
|
store_dir: Path,
|
|
) -> _ModelStore:
|
|
from dump_things_service.backends.schema_type_layer import SchemaTypeLayer
|
|
from dump_things_service.abstract_config import get_backend_and_extension
|
|
from dump_things_service.exceptions import ConfigError
|
|
from dump_things_service.store.model_store import ModelStore
|
|
|
|
# Check if the store was already created and if it was created for the
|
|
# same schema.
|
|
if store_dir in instance_state.all_stores:
|
|
existing_collection_name, existing_model_store = instance_state.all_stores[store_dir]
|
|
if (
|
|
existing_collection_name != collection_name
|
|
and instance_state.schemas[existing_collection_name] != instance_state.schemas[collection_name]
|
|
):
|
|
msg = (
|
|
f"collections '{existing_collection_name}' and "
|
|
f"'{collection_name}' with different schemas map onto the same"
|
|
f" storage directory: '<incoming_path>/{store_dir.name}'"
|
|
)
|
|
raise HTTPException(
|
|
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=msg,
|
|
)
|
|
return existing_model_store
|
|
|
|
store_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
schema_uri = abstract_configuration.collections[collection_name].schema
|
|
|
|
# We get the backend information from the abstract configuration
|
|
backend_type = abstract_configuration.collections[collection_name].backend.type
|
|
backend_name, extension = get_backend_and_extension(backend_type)
|
|
|
|
backend = instance_state.curated_stores[collection_name].backend
|
|
if backend_name == 'record_dir':
|
|
# The configuration routines have read the backend configuration of the
|
|
# curated store from disk and stored it in `instance_state`. We fetch
|
|
# it from there.
|
|
if extension == 'stl':
|
|
backend = backend.backend
|
|
|
|
token_store = create_record_dir_token_store(
|
|
store_dir=store_dir,
|
|
order_by=backend.order_by,
|
|
schema_uri=schema_uri,
|
|
mapping_function=backend.pid_mapping_function,
|
|
suffix=backend.suffix,
|
|
)
|
|
elif backend_name == 'sqlite':
|
|
token_store = create_sqlite_token_store(
|
|
store_dir=store_dir,
|
|
order_by=backend.order_by,
|
|
)
|
|
else:
|
|
# This should not happen because we base our decision on already
|
|
# existing backends.
|
|
msg = f'Unsupported backend type: `{backend_type}`.'
|
|
raise ConfigError(msg)
|
|
|
|
if extension == 'stl':
|
|
token_store = SchemaTypeLayer(backend=token_store, schema=schema_uri)
|
|
|
|
submission_tags = abstract_configuration.collections[collection_name].submission_tags
|
|
tags = {
|
|
'id': submission_tags.submitter_id_tag,
|
|
'time': submission_tags.submission_time_tag,
|
|
}
|
|
model_store = ModelStore(backend=token_store, schema=schema_uri, tags=tags)
|
|
instance_state.all_stores[store_dir] = (collection_name, model_store)
|
|
|
|
return model_store
|
|
|
|
|
|
def create_record_dir_token_store(
|
|
store_dir: Path,
|
|
order_by: list[str],
|
|
schema_uri: str,
|
|
mapping_function: Callable,
|
|
suffix: str,
|
|
) -> _RecordDirStore:
|
|
from dump_things_service.backends.record_dir import RecordDirStore
|
|
|
|
store_backend = RecordDirStore(
|
|
root=store_dir,
|
|
pid_mapping_function=mapping_function,
|
|
suffix=suffix,
|
|
order_by=order_by,
|
|
)
|
|
store_backend.build_index_if_needed(schema=schema_uri)
|
|
return store_backend
|
|
|
|
|
|
def create_sqlite_token_store(
|
|
store_dir: Path,
|
|
order_by: list[str],
|
|
) -> SQLiteBackend:
|
|
from dump_things_service.backends.sqlite import SQLiteBackend
|
|
from dump_things_service.backends.sqlite import (
|
|
record_file_name as sqlite_record_file_name,
|
|
)
|
|
|
|
return SQLiteBackend(
|
|
db_path=store_dir / sqlite_record_file_name,
|
|
order_by=order_by,
|
|
)
|
|
|
|
|
|
def check_bounds(
|
|
length: int | None,
|
|
max_length: int,
|
|
collection: str,
|
|
alternative_url: str
|
|
):
|
|
if length > max_length:
|
|
raise HTTPException(
|
|
status_code=HTTP_413_CONTENT_TOO_LARGE,
|
|
detail=f"Too many records found in collection '{collection}'. "
|
|
f'Please use pagination (/{collection}{alternative_url}).',
|
|
)
|
|
|
|
|
|
async def process_token(
|
|
abstract_config: Configuration,
|
|
instance_state: InstanceState,
|
|
api_key: str | None,
|
|
collection: str,
|
|
) -> tuple[TokenPermission, _ModelStore]:
|
|
|
|
token_config = (
|
|
get_default_token_config(abstract_config, collection)
|
|
if api_key is None
|
|
else get_token_config_for_representation_and_collection(
|
|
abstract_config,
|
|
collection_name=collection,
|
|
token_representation=api_key,
|
|
)[1]
|
|
)
|
|
|
|
if not token_config:
|
|
detail = f'invalid token'
|
|
raise HTTPException(
|
|
status_code=HTTP_401_UNAUTHORIZED,
|
|
detail=detail,
|
|
)
|
|
|
|
token_store, token_permissions, user_id = get_token_store(
|
|
abstract_config,
|
|
instance_state,
|
|
collection,
|
|
api_key,
|
|
)
|
|
final_permissions = join_default_token_permissions(
|
|
abstract_config, instance_state, token_permissions, collection
|
|
)
|
|
|
|
# Check for maintenance mode
|
|
if collection in instance_state.maintenance_mode:
|
|
if not (
|
|
final_permissions.curated_read
|
|
and final_permissions.curated_write
|
|
and final_permissions.zones_access
|
|
):
|
|
raise HTTPException(
|
|
status_code=HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail=f"Collection '{collection}' is in maintenance mode",
|
|
)
|
|
|
|
if not final_permissions.incoming_read and not final_permissions.curated_read:
|
|
raise HTTPException(
|
|
status_code=HTTP_403_FORBIDDEN,
|
|
detail=f"No read access to curated or incoming data in collection '{collection}'.",
|
|
)
|
|
return final_permissions, token_store
|
|
|
|
|
|
def get_required_incoming_labels(
|
|
abstract_config: Configuration,
|
|
collection_name: str,
|
|
) -> set[str]:
|
|
return set(
|
|
map(
|
|
lambda x: x[1],
|
|
get_required_incoming_info(abstract_config, collection_name),
|
|
)
|
|
)
|
|
|
|
|
|
def get_required_incoming_info(
|
|
abstract_config: Configuration,
|
|
collection_name: str,
|
|
) -> set[tuple[str, str]]:
|
|
return {
|
|
(token_name, this_collection_info.incoming_label)
|
|
for token_name, token_info in abstract_config.tokens.items()
|
|
for this_collection_name, this_collection_info in token_info.collections.items()
|
|
if this_collection_name == collection_name and mode_mapping[
|
|
TokenModes(this_collection_info.mode)
|
|
].incoming_write is True
|
|
}
|