If a record-dir backend is created on a directory, check if the config file exists, if not write it.
507 lines
16 KiB
Python
507 lines
16 KiB
Python
"""
|
|
|
|
|
|
To speed up processing, multiple indices could be introduced, e.g.:
|
|
|
|
- token representation -> token name
|
|
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import sys
|
|
from contextlib import contextmanager
|
|
from functools import reduce
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Callable,
|
|
)
|
|
|
|
import fsspec
|
|
from fastapi import HTTPException
|
|
from rdflib import Graph
|
|
|
|
from dump_things_service import (
|
|
HTTP_400_BAD_REQUEST,
|
|
HTTP_401_UNAUTHORIZED,
|
|
HTTP_403_FORBIDDEN,
|
|
HTTP_413_CONTENT_TOO_LARGE,
|
|
HTTP_503_SERVICE_UNAVAILABLE,
|
|
)
|
|
from dump_things_service.abstract_config import (
|
|
Configuration,
|
|
RecordDirBackendConfig,
|
|
TokenModes,
|
|
TokenPermission,
|
|
mode_mapping,
|
|
check_collection,
|
|
get_default_token_config,
|
|
get_token_config_for_representation_and_collection,
|
|
get_mapping_function_by_name,
|
|
)
|
|
from dump_things_service.auth import (
|
|
AuthenticationError,
|
|
AuthenticationInfo,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from pathlib import Path
|
|
|
|
from dump_things_service import JSON
|
|
from dump_things_service.backends.record_dir import _RecordDirStore
|
|
from dump_things_service.backends.sqlite import _SQLiteBackend
|
|
from dump_things_service.instance_state import InstanceState
|
|
from dump_things_service.store.model_store import _ModelStore
|
|
|
|
|
|
logger = logging.getLogger('dump_things_service')
|
|
|
|
|
|
@contextmanager
|
|
def sys_path(paths: list[str | Path]):
|
|
"""Patch the `Path` class to return the paths in `paths` in order."""
|
|
original_path = sys.path
|
|
try:
|
|
sys.path = [str(path) for path in paths]
|
|
yield
|
|
finally:
|
|
sys.path = original_path
|
|
|
|
|
|
def read_url(url: str) -> str:
|
|
"""
|
|
Read the content of a URL into memory.
|
|
"""
|
|
open_file = fsspec.open(url, 'rt')
|
|
with open_file as f:
|
|
return f.read()
|
|
|
|
|
|
def cleaned_json(data: JSON, remove_keys: tuple[str, ...] = ('@type',)) -> JSON:
|
|
if isinstance(data, list):
|
|
return [cleaned_json(item, remove_keys) for item in data]
|
|
if isinstance(data, dict):
|
|
return {
|
|
key: cleaned_json(value, remove_keys)
|
|
for key, value in data.items()
|
|
if key not in remove_keys and data[key] is not None
|
|
}
|
|
return data
|
|
|
|
|
|
def combine_ttl(documents: list[str]) -> str:
|
|
graphs = [Graph().parse(data=doc, format='ttl') for doc in documents]
|
|
return reduce(lambda g1, g2: g1 + g2, graphs).serialize(format='ttl')
|
|
|
|
|
|
@contextmanager
|
|
def wrap_http_exception(
|
|
exception_class: type[BaseException] = ValueError,
|
|
status_code: int = HTTP_400_BAD_REQUEST,
|
|
header: str = ''
|
|
):
|
|
"""Wrap exceptions of class `exception_class` into HTTP exceptions"""
|
|
try:
|
|
yield
|
|
except exception_class as e:
|
|
raise HTTPException(
|
|
status_code=status_code,
|
|
detail=f'{header}: {e}' if header else str(e),
|
|
) from e
|
|
|
|
|
|
def join_default_token_permissions(
|
|
abstract_configuration: Configuration,
|
|
instance_state: InstanceState,
|
|
permissions: TokenPermission,
|
|
collection: str,
|
|
) -> TokenPermission:
|
|
|
|
result = permissions.model_copy()
|
|
|
|
# Get the default token name. If a default token is not defined, return
|
|
# token permissions without any right. A collection might define a default
|
|
# token that does not yet exist. We allow this inconsistency to decouple
|
|
# token and collection creation, i.e. to allow to create a collection first
|
|
# and a token later.
|
|
default_token_name = abstract_configuration.collections[collection].default_token
|
|
if default_token_name not in abstract_configuration.tokens:
|
|
return result
|
|
|
|
# We allow inconsistencies in token/collection configuration space. This
|
|
# allows an administrator to create tokens and collections in two separate
|
|
# steps. Therefore, we have to check whether the referred default token
|
|
# is actually defined for the collection.
|
|
if collection not in abstract_configuration.tokens[default_token_name].collections:
|
|
return result
|
|
|
|
default_token_mode = abstract_configuration.tokens[default_token_name].collections[collection].mode
|
|
default_token_permissions = mode_mapping[TokenModes(default_token_mode)]
|
|
result.curated_read = (
|
|
permissions.curated_read | default_token_permissions.curated_read
|
|
)
|
|
result.incoming_read = (
|
|
permissions.incoming_read | default_token_permissions.incoming_read
|
|
)
|
|
result.incoming_write = (
|
|
permissions.incoming_write | default_token_permissions.incoming_write
|
|
)
|
|
return result
|
|
|
|
|
|
def get_on_disk_labels(
|
|
store_path: Path,
|
|
abstract_config: Configuration,
|
|
collection: str,
|
|
) -> set[str]:
|
|
check_collection(abstract_config, collection)
|
|
|
|
incoming_path = (
|
|
store_path / abstract_config.collections[collection].incoming
|
|
)
|
|
if not incoming_path or not incoming_path.exists():
|
|
return set()
|
|
|
|
return {
|
|
path.name
|
|
for path in incoming_path.iterdir()
|
|
if path.is_dir()
|
|
}
|
|
|
|
|
|
def authenticate_token(
|
|
instance_state: InstanceState,
|
|
collection_name: str,
|
|
token_representation: str,
|
|
) -> AuthenticationInfo:
|
|
|
|
# Try to authenticate the token with the authentication providers that
|
|
# are associated with the collection.
|
|
auth_info = None
|
|
messages = []
|
|
for auth_source in instance_state.auth_sources[collection_name]:
|
|
try:
|
|
logger.debug('trying to authenticate with %s', auth_source)
|
|
auth_info = auth_source.authenticate(token_representation)
|
|
break
|
|
except AuthenticationError as ae:
|
|
logger.debug(
|
|
'Authentication provider %s could not '
|
|
'authenticate token for collection %s: %s',
|
|
auth_source,
|
|
collection_name,
|
|
str(ae),
|
|
)
|
|
messages.append(f'{auth_source.__class__.__name__} failed with: {ae}')
|
|
continue
|
|
|
|
if not auth_info:
|
|
detail = f'invalid token for collection {collection_name}: ' + ', '.join(
|
|
messages,
|
|
)
|
|
raise HTTPException(
|
|
status_code=HTTP_401_UNAUTHORIZED,
|
|
detail=detail,
|
|
)
|
|
return auth_info
|
|
|
|
|
|
def get_token_store(
|
|
abstract_config: Configuration,
|
|
instance_state: InstanceState,
|
|
collection_name: str,
|
|
token_representation: str,
|
|
) -> tuple[_ModelStore, TokenPermission, str] | tuple[None, None, None, None]:
|
|
|
|
# Try to authenticate the token with the authentication providers that
|
|
# are associated with the collection.
|
|
auth_info = authenticate_token(
|
|
instance_state,
|
|
collection_name,
|
|
token_representation,
|
|
)
|
|
permissions = auth_info.token_permission
|
|
|
|
# If the token has no incoming-read or incoming-write permissions, we do not
|
|
# need to create a store.
|
|
if not permissions.incoming_read and not permissions.incoming_write:
|
|
instance_state.incoming_stores[collection_name][token_representation] = (
|
|
None,
|
|
permissions,
|
|
auth_info.user_id,
|
|
)
|
|
return instance_state.incoming_stores[collection_name][token_representation]
|
|
|
|
# Check whether the collection has an incoming definition
|
|
incoming = abstract_config.collections[collection_name].incoming
|
|
if not incoming:
|
|
raise HTTPException(
|
|
status_code=HTTP_401_UNAUTHORIZED,
|
|
detail='No incoming area for collection ' + collection_name
|
|
)
|
|
|
|
# Check whether a store for this collection and token does already exist.
|
|
store_info = instance_state.incoming_stores[collection_name].get(token_representation)
|
|
if store_info:
|
|
return store_info
|
|
|
|
store_dir = instance_state.store_path / incoming / auth_info.incoming_label
|
|
token_store = create_token_store(
|
|
abstract_configuration=abstract_config,
|
|
instance_state=instance_state,
|
|
collection_name=collection_name,
|
|
store_dir=store_dir,
|
|
)
|
|
|
|
instance_state.incoming_stores[collection_name][token_representation] = (
|
|
token_store,
|
|
permissions,
|
|
auth_info.user_id,
|
|
)
|
|
return instance_state.incoming_stores[collection_name][token_representation]
|
|
|
|
|
|
def create_store(
|
|
abstract_configuration: Configuration,
|
|
instance_state: InstanceState,
|
|
collection_name: str,
|
|
) -> _ModelStore:
|
|
collection_curated_path = abstract_configuration.collections[collection_name].curated
|
|
return create_token_store(
|
|
abstract_configuration=abstract_configuration,
|
|
instance_state=instance_state,
|
|
collection_name=collection_name,
|
|
store_dir=instance_state.store_path / collection_curated_path,
|
|
)
|
|
|
|
|
|
def create_token_store(
|
|
abstract_configuration: Configuration,
|
|
instance_state: InstanceState,
|
|
collection_name: str,
|
|
store_dir: Path,
|
|
) -> _ModelStore:
|
|
from dump_things_service.backends.schema_type_layer import SchemaTypeLayer
|
|
from dump_things_service.abstract_config import get_backend_and_extension
|
|
from dump_things_service.exceptions import ConfigError
|
|
from dump_things_service.store.model_store import ModelStore
|
|
|
|
# One early requirement for the service was to be able to specify
|
|
# arbitrary directories for curated stores and incoming stores. This
|
|
# explicitly included the use case where an incoming store and a
|
|
# curated store are identical. This has the following consequences:
|
|
#
|
|
# 1. Any collection might have multiple incoming stores that use the same
|
|
# directory as the curated store.
|
|
#
|
|
# 2. Multiple collections might share curated or incoming directories with
|
|
# other stores.
|
|
#
|
|
# From 1. follows that, for efficiency and consistency reasons, existing
|
|
# backends for a directory should be reused. With 2. one has to check that
|
|
# the collections that specify the backend have matching schemas. Schemas
|
|
# must match if the same backend, i.e., the same directory and basic backend
|
|
# type (basic backend types are `record_dir` or `sqlite`) are used.
|
|
# If different backend types are used (which is possible in the same
|
|
# directory), the schemas could in principle be different.
|
|
|
|
store_dir.mkdir(parents=True, exist_ok=True)
|
|
schema_uri = abstract_configuration.collections[collection_name].schema_location
|
|
|
|
# We get the backend information from the abstract configuration
|
|
backend_config = abstract_configuration.collections[collection_name].backend
|
|
backend_name, extension = get_backend_and_extension(backend_config.type)
|
|
if backend_name == 'record_dir':
|
|
|
|
backend = create_record_dir_token_store_backend(
|
|
store_dir=store_dir,
|
|
order_by=instance_state.order_by,
|
|
schema_uri=schema_uri,
|
|
mapping_function=backend_config.mapping_method,
|
|
suffix='yaml',
|
|
)
|
|
elif backend_name == 'sqlite':
|
|
backend = create_sqlite_token_store_backend(
|
|
store_dir=store_dir,
|
|
order_by=instance_state.order_by,
|
|
)
|
|
else:
|
|
# This should not happen because we base our decision on already
|
|
# existing backends.
|
|
msg = f'Unsupported backend type: `{backend_name}`.'
|
|
raise ConfigError(msg)
|
|
|
|
if extension == 'stl':
|
|
backend = SchemaTypeLayer(backend=backend, schema=schema_uri)
|
|
|
|
submission_tags = abstract_configuration.collections[collection_name].submission_tags
|
|
return ModelStore(
|
|
schema=schema_uri,
|
|
backend=backend,
|
|
tags={
|
|
'id': submission_tags.submitter_id_tag,
|
|
'time': submission_tags.submission_time_tag,
|
|
},
|
|
)
|
|
|
|
|
|
def create_record_dir_token_store_backend(
|
|
store_dir: Path,
|
|
order_by: list[str],
|
|
schema_uri: str,
|
|
mapping_function: str,
|
|
suffix: str,
|
|
) -> _RecordDirStore:
|
|
from dump_things_service.instance_state import record_dir_config_file_name
|
|
from dump_things_service.backends.record_dir import RecordDirStore
|
|
|
|
# Write the configuration to the store, if it does not yet exist.
|
|
if not (store_dir / record_dir_config_file_name).exists():
|
|
write_record_dir_config(
|
|
path=store_dir,
|
|
mapping_function=mapping_function,
|
|
schema=schema_uri,
|
|
)
|
|
|
|
store_backend = RecordDirStore(
|
|
root=store_dir,
|
|
pid_mapping_function=get_mapping_function_by_name(mapping_function),
|
|
suffix=suffix,
|
|
order_by=order_by,
|
|
)
|
|
store_backend.build_index_if_needed(schema=schema_uri)
|
|
return store_backend
|
|
|
|
|
|
def write_record_dir_config(
|
|
path: Path,
|
|
mapping_function: str,
|
|
schema: str,
|
|
):
|
|
from dump_things_service.instance_state import record_dir_config_file_name
|
|
|
|
record_dir_config_file_path = path / record_dir_config_file_name
|
|
if not record_dir_config_file_path.exists():
|
|
record_dir_config_file_path.write_text(f"""# RecordDir Config
|
|
type: records
|
|
version: 1
|
|
schema: {schema}
|
|
format: yaml
|
|
idfx: {mapping_function}
|
|
""",
|
|
)
|
|
|
|
|
|
def create_sqlite_token_store_backend(
|
|
store_dir: Path,
|
|
order_by: list[str],
|
|
) -> _SQLiteBackend:
|
|
from dump_things_service.backends.sqlite import SQLiteBackend
|
|
from dump_things_service.backends.sqlite import (
|
|
record_file_name as sqlite_record_file_name,
|
|
)
|
|
|
|
return SQLiteBackend(
|
|
db_path=store_dir / sqlite_record_file_name,
|
|
order_by=order_by,
|
|
)
|
|
|
|
|
|
def check_bounds(
|
|
length: int | None,
|
|
max_length: int,
|
|
collection: str,
|
|
alternative_url: str
|
|
):
|
|
if length > max_length:
|
|
raise HTTPException(
|
|
status_code=HTTP_413_CONTENT_TOO_LARGE,
|
|
detail=f"Too many records found in collection '{collection}'. "
|
|
f'Please use pagination (/{collection}{alternative_url}).',
|
|
)
|
|
|
|
|
|
async def process_token(
|
|
abstract_config: Configuration,
|
|
instance_state: InstanceState,
|
|
api_key: str | None,
|
|
collection: str,
|
|
) -> tuple[TokenPermission, _ModelStore]:
|
|
|
|
if api_key is None:
|
|
token_config = get_default_token_config(abstract_config, collection)
|
|
else:
|
|
token_elements = get_token_config_for_representation_and_collection(
|
|
abstract_config,
|
|
collection_name=collection,
|
|
token_representation=api_key,
|
|
)
|
|
token_config = token_elements[1] if token_elements else None
|
|
|
|
if not token_config:
|
|
detail = f"invalid token for collection '{collection}'"
|
|
raise HTTPException(
|
|
status_code=HTTP_401_UNAUTHORIZED,
|
|
detail=detail,
|
|
)
|
|
|
|
token_store, token_permissions, user_id = get_token_store(
|
|
abstract_config,
|
|
instance_state,
|
|
collection,
|
|
token_config.representation,
|
|
)
|
|
final_permissions = join_default_token_permissions(
|
|
abstract_config, instance_state, token_permissions, collection
|
|
)
|
|
|
|
# Check for maintenance mode
|
|
if collection in instance_state.maintenance_mode:
|
|
if not (
|
|
final_permissions.curated_read
|
|
and final_permissions.curated_write
|
|
and final_permissions.zones_access
|
|
):
|
|
raise HTTPException(
|
|
status_code=HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail=f"Collection '{collection}' is in maintenance mode",
|
|
)
|
|
|
|
if not final_permissions.incoming_read and not final_permissions.curated_read:
|
|
raise HTTPException(
|
|
status_code=HTTP_403_FORBIDDEN,
|
|
detail=f"No read access to curated or incoming data in collection '{collection}'.",
|
|
)
|
|
return final_permissions, token_store
|
|
|
|
|
|
def get_required_incoming_labels(
|
|
abstract_config: Configuration,
|
|
collection_name: str,
|
|
) -> set[str]:
|
|
return set(
|
|
map(
|
|
lambda x: x[1],
|
|
get_required_incoming_info(abstract_config, collection_name),
|
|
)
|
|
)
|
|
|
|
|
|
def get_required_incoming_info(
|
|
abstract_config: Configuration,
|
|
collection_name: str,
|
|
) -> set[tuple[str, str]]:
|
|
return {
|
|
(token_name, this_collection_info.incoming_label)
|
|
for token_name, token_info in abstract_config.tokens.items()
|
|
for this_collection_name, this_collection_info in token_info.collections.items()
|
|
if this_collection_name == collection_name and mode_mapping[
|
|
TokenModes(this_collection_info.mode)
|
|
].incoming_write is True
|
|
}
|
|
|
|
|
|
def var_escape(
|
|
name: str,
|
|
) -> str:
|
|
return name.replace('_', '___').replace('-', '_0_')
|