dump-things-server/dump_things_service/config.py
Christian Monch 308642e6b0 add use_classes key to collection configuration
If the `use_classes` key is present in a collection
configuration mapping, only the classes described
in `use_classes` key will receive endpoints for
storing and validating.
2025-10-29 14:15:28 +01:00

649 lines
23 KiB
Python

from __future__ import annotations
import dataclasses
import enum
import hashlib
import logging
from functools import partial
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Callable,
Literal,
)
import yaml
from fastapi import HTTPException
from pydantic import (
BaseModel,
ConfigDict,
Field,
ValidationError,
)
from yaml.scanner import ScannerError
from dump_things_service import (
HTTP_404_NOT_FOUND,
Format,
)
from dump_things_service.backends.record_dir import RecordDirStore
from dump_things_service.backends.schema_type_layer import SchemaTypeLayer
from dump_things_service.backends.sqlite import SQLiteBackend
from dump_things_service.backends.sqlite import (
record_file_name as sqlite_record_file_name,
)
from dump_things_service.converter import FormatConverter, get_conversion_objects
from dump_things_service.exceptions import (
ConfigError,
CurieResolutionError,
)
from dump_things_service.model import get_model_for_schema
from dump_things_service.resolve_curie import resolve_curie
from dump_things_service.store.model_store import ModelStore
from dump_things_service.token import (
TokenPermission,
get_token_parts,
hash_token,
)
from dump_things_service.utils import check_collection
if TYPE_CHECKING:
import types
logger = logging.getLogger('dump_things_service')
config_file_name = '.dumpthings.yaml'
ignored_files = {'.', '..', config_file_name}
_global_config_instance = None
class StrictModel(BaseModel):
model_config = ConfigDict(extra='forbid')
class MappingMethod(enum.Enum):
digest_md5 = 'digest-md5'
digest_md5_p3 = 'digest-md5-p3'
digest_md5_p3_p3 = 'digest-md5-p3-p3'
digest_sha1 = 'digest-sha1'
digest_sha1_p3 = 'digest-sha1-p3'
digest_sha1_p3_p3 = 'digest-sha1-p3-p3'
after_last_colon = 'after-last-colon'
class CollectionDirConfig(StrictModel):
type: Literal['records']
version: Literal[1]
schema: str
format: Literal['yaml']
idfx: MappingMethod
class TokenModes(enum.Enum):
READ_CURATED = 'READ_CURATED'
READ_COLLECTION = 'READ_COLLECTION'
WRITE_COLLECTION = 'WRITE_COLLECTION'
READ_SUBMISSIONS = 'READ_SUBMISSIONS'
WRITE_SUBMISSIONS = 'WRITE_SUBMISSIONS'
SUBMIT = 'SUBMIT'
SUBMIT_ONLY = 'SUBMIT_ONLY'
NOTHING = 'NOTHING'
CURATOR = 'CURATOR'
class TokenCollectionConfig(BaseModel):
model_config = ConfigDict(extra='forbid')
mode: TokenModes
incoming_label: str = Field(strict=True)
class TokenConfig(StrictModel):
user_id: str
collections: dict[str, TokenCollectionConfig]
hashed: bool = False
class BackendConfigRecordDir(StrictModel):
type: Literal['record_dir', 'record_dir+stl']
class BackendConfigSQLite(StrictModel):
type: Literal['sqlite', 'sqlite+stl']
schema: str
class ForgejoAuthConfig(StrictModel):
type: Literal['forgejo']
url: str
organization: str
team: str
label_type: Literal['team', 'user']
repository: str | None = None
class ConfigAuthConfig(StrictModel):
type: Literal['config'] = 'config'
class TagConfig(StrictModel):
submitter_id_tag: str = 'http://purl.obolibrary.org/obo/NCIT_C54269'
submission_time_tag: str = 'http://semanticscience.org/resource/SIO_001083'
class CollectionConfig(StrictModel):
default_token: str
curated: Path
incoming: Path | None = None
backend: BackendConfigRecordDir | BackendConfigSQLite | None = None
auth_sources: list[ForgejoAuthConfig | ConfigAuthConfig] = [ConfigAuthConfig()]
submission_tags: TagConfig = TagConfig()
use_classes: list[str] = dataclasses.field(default_factory=list)
ignore_classes: list[str] = dataclasses.field(default_factory=list)
class GlobalConfig(StrictModel):
model_config = ConfigDict(strict=True)
type: Literal['collections']
version: Literal[1]
collections: dict[str, CollectionConfig]
tokens: dict[str, TokenConfig]
@dataclasses.dataclass
class InstanceConfig:
store_path: Path
collections: dict = dataclasses.field(default_factory=dict)
all_stores: dict = dataclasses.field(default_factory=dict)
curated_stores: dict = dataclasses.field(default_factory=dict)
incoming: dict = dataclasses.field(default_factory=dict)
zones: dict = dataclasses.field(default_factory=dict)
permissions: dict = dataclasses.field(default_factory=dict)
model_info: dict = dataclasses.field(default_factory=dict)
token_stores: dict = dataclasses.field(default_factory=dict)
schemas: dict = dataclasses.field(default_factory=dict)
conversion_objects: dict = dataclasses.field(default_factory=dict)
backend: dict = dataclasses.field(default_factory=dict)
auth_providers: dict = dataclasses.field(default_factory=dict)
tokens: dict = dataclasses.field(default_factory=dict)
hashed_tokens: dict = dataclasses.field(default_factory=dict)
validators: dict = dataclasses.field(default_factory=dict)
use_classes: dict = dataclasses.field(default_factory=dict)
mode_mapping = {
TokenModes.READ_CURATED: TokenPermission(curated_read=True),
TokenModes.READ_COLLECTION: TokenPermission(
curated_read=True,
incoming_read=True,
),
TokenModes.WRITE_COLLECTION: TokenPermission(
curated_read=True,
incoming_read=True,
incoming_write=True,
),
TokenModes.READ_SUBMISSIONS: TokenPermission(incoming_read=True),
TokenModes.WRITE_SUBMISSIONS: TokenPermission(
incoming_read=True,
incoming_write=True,
),
TokenModes.SUBMIT: TokenPermission(curated_read=True, incoming_write=True),
TokenModes.SUBMIT_ONLY: TokenPermission(incoming_write=True),
TokenModes.NOTHING: TokenPermission(),
TokenModes.CURATOR: TokenPermission(
curated_read=True,
incoming_read=True,
incoming_write=True,
curated_write=True,
zones_access=True,
),
}
def get_hex_digest(hasher: Callable, data: str) -> str:
hash_context = hasher(data.encode())
return hash_context.hexdigest()
def mapping_digest_p3(
hasher: Callable,
pid: str,
suffix: str,
) -> Path:
hex_digest = get_hex_digest(hasher, pid)
return Path(hex_digest[:3]) / (hex_digest[3:] + '.' + suffix)
def mapping_digest_p3_p3(
hasher: Callable,
pid: str,
suffix: str,
) -> Path:
hex_digest = get_hex_digest(hasher, pid)
return Path(hex_digest[:3]) / hex_digest[3:6] / (hex_digest[6:] + '.' + suffix)
def mapping_digest(hasher: Callable, pid: str, suffix: str) -> Path:
hex_digest = get_hex_digest(hasher, pid)
return Path(hex_digest + '.' + suffix)
def mapping_after_last_colon(pid: str, suffix: str) -> Path:
plain_result = pid.split(':')[-1]
# Escape any colons and slashes in the pid
escaped_result = (
plain_result.replace('_', '__').replace('/', '_s').replace('.', '_d')
)
return Path(escaped_result + '.' + suffix)
mapping_functions = {
MappingMethod.digest_md5: partial(mapping_digest, hashlib.md5),
MappingMethod.digest_md5_p3: partial(mapping_digest_p3, hashlib.md5),
MappingMethod.digest_md5_p3_p3: partial(mapping_digest_p3_p3, hashlib.md5),
MappingMethod.digest_sha1: partial(mapping_digest, hashlib.sha1),
MappingMethod.digest_sha1_p3: partial(mapping_digest_p3, hashlib.sha1),
MappingMethod.digest_sha1_p3_p3: partial(mapping_digest_p3_p3, hashlib.sha1),
MappingMethod.after_last_colon: mapping_after_last_colon,
}
def get_mapping_function_by_name(mapping_function_name: str) -> Callable:
return mapping_functions[MappingMethod(mapping_function_name)]
def get_mapping_function(collection_config: CollectionDirConfig):
return mapping_functions[collection_config.idfx]
def get_permissions(mode: TokenModes) -> TokenPermission:
return mode_mapping[mode]
class Config:
@staticmethod
def get_config_from_file(path: Path) -> GlobalConfig:
try:
return GlobalConfig(**yaml.load(path.read_text(), Loader=yaml.SafeLoader))
except ScannerError as e:
msg = f'YAML-error while reading config file {path}: {e}'
raise ConfigError(msg) from e
except TypeError:
msg = f'Error in yaml file {path}: content is not a mapping'
raise ConfigError(msg) from None
except ValidationError as e:
msg = f'Pydantic-error reading config file {path}: {e}'
raise ConfigError(msg) from e
@staticmethod
def get_config(path: Path, file_name=config_file_name) -> GlobalConfig:
return Config.get_config_from_file(path / file_name)
@staticmethod
def get_collection_dir_config(
path: Path,
file_name: str = config_file_name,
) -> CollectionDirConfig:
config_path = path / file_name
if not config_path.exists():
msg = f'Config file does not exist: {config_path}'
raise ConfigError(msg)
try:
return CollectionDirConfig(
**yaml.load(config_path.read_text(), Loader=yaml.SafeLoader)
)
except ScannerError as e:
msg = f'YAML-error while reading config file {config_path}: {e}'
raise ConfigError(msg) from e
except ValidationError as e:
msg = f'Pydantic-error reading config file {config_path}: {e}'
raise ConfigError(msg) from e
def process_config(
store_path: Path,
config_file: Path,
order_by: list[str],
globals_dict: dict[str, Any],
) -> InstanceConfig:
global global_config_instance
config_object = Config.get_config_from_file(config_file)
global_config_instance = process_config_object(
store_path=store_path,
config_object=config_object,
order_by=order_by,
globals_dict=globals_dict,
)
return global_config_instance
def get_config():
return global_config_instance
def process_config_object(
store_path: Path,
config_object: GlobalConfig,
order_by: list[str],
globals_dict: dict[str, Any],
):
from dump_things_service.auth.config import ConfigAuthenticationSource
from dump_things_service.auth.forgejo import ForgejoAuthenticationSource
instance_config = InstanceConfig(store_path=store_path)
instance_config.collections = config_object.collections
for collection_name, collection_info in config_object.collections.items():
# Create the authentication providers
instance_config.auth_providers[collection_name] = []
auth_provider_list = []
# Check for multiple providers
for auth_provider in collection_info.auth_sources:
if auth_provider.type == 'config':
key = ('config',)
elif auth_provider.type == 'forgejo':
key = (
'forgejo',
auth_provider.url,
auth_provider.organization,
auth_provider.team,
auth_provider.label_type,
auth_provider.repository,
)
else:
msg = f'Unknown authentication provider type: {auth_provider.type}'
raise ConfigError(msg)
if key in auth_provider_list:
logger.warning('Ignoring duplicated authentication provider: %s', key)
continue
auth_provider_list.append(key)
for auth_provider in auth_provider_list:
if auth_provider[0] == 'config':
instance_config.auth_providers[collection_name].append(
ConfigAuthenticationSource(
instance_config=instance_config,
collection=collection_name,
)
)
else:
instance_config.auth_providers[collection_name].append(
ForgejoAuthenticationSource(*auth_provider[1:])
)
# Set the default backend if not specified
backend = collection_info.backend or BackendConfigRecordDir(
type='record_dir+stl'
)
instance_config.backend[collection_name] = backend
backend_name, extension = get_backend_and_extension(backend.type)
if backend_name == 'record_dir':
# Get the config from the curated directory
collection_config = Config.get_collection_dir_config(
store_path / collection_info.curated
)
schema = collection_config.schema
elif backend.type == 'sqlite':
schema = backend.schema
else:
msg = f'Unsupported backend `{collection_info.backend}` for collection `{collection_name}`.'
raise ConfigError(msg)
# Generate the collection model
model, classes, model_var_name = get_model_for_schema(schema)
instance_config.model_info[collection_name] = model, classes, model_var_name
globals_dict[model_var_name] = model
# Generate the curated stores
if backend_name == 'record_dir':
curated_store_backend = RecordDirStore(
root=store_path / collection_info.curated,
pid_mapping_function=get_mapping_function(collection_config),
suffix=collection_config.format,
order_by=order_by,
)
curated_store_backend.build_index_if_needed(schema=schema)
elif backend.type == 'sqlite':
curated_store_backend = SQLiteBackend(
db_path=store_path / collection_info.curated / sqlite_record_file_name,
)
else:
msg = f'Unsupported backend `{collection_info.backend}` for collection `{collection_name}`.'
raise ConfigError(msg)
if extension == 'stl':
curated_store_backend = SchemaTypeLayer(
backend=curated_store_backend,
schema=schema,
)
curated_store = ModelStore(
schema=schema,
backend=curated_store_backend,
tags={
'id': collection_info.submission_tags.submitter_id_tag,
'time': collection_info.submission_tags.submission_time_tag,
}
)
instance_config.curated_stores[collection_name] = curated_store
if collection_info.incoming:
instance_config.incoming[collection_name] = collection_info.incoming
instance_config.schemas[collection_name] = schema
if schema not in instance_config.conversion_objects:
instance_config.conversion_objects[schema] = get_conversion_objects(schema)
# We do not create stores for tokens here, but leave it to the token
# authentication routine.
instance_config.token_stores[collection_name] = {}
# Create validator for each collection
for collection_name, _ in config_object.collections.items():
instance_config.validators[collection_name] = FormatConverter(
schema=instance_config.schemas[collection_name],
input_format=Format.json,
output_format=Format.ttl,
)
# Resolve classes-blacklist and -whitelist
for collection_name, collection_info in config_object.collections.items():
model_info = instance_config.model_info[collection_name]
# If the whitelist is present, get all whitelisted classes
if collection_info.use_classes:
# Check that the whitelisted classes exist
undefined = [
name
for name in collection_info.use_classes
if name not in model_info[1]
]
if undefined:
msg = (
'used class(es): '
+ ', '.join(undefined)
+ ' not defined in schema: '
+ model_info[0].linkml_meta.root['id']
)
raise ConfigError(msg)
use_classes = collection_info.use_classes
else:
use_classes = model_info[1]
# Check for blacklisted classes
undefined = [
name
for name in collection_info.ignore_classes
if name not in use_classes
]
if undefined:
msg = (
'ignored class(es): '
+ ', '.join(undefined)
+ ' not defined in schema or in `used_classes`: '
+ model_info[0].linkml_meta.root['id']
)
raise ConfigError(msg)
instance_config.use_classes[collection_name] = [
name
for name in use_classes
if name not in collection_info.ignore_classes
]
# Read info for tokens from the configuration
for token_name, token_info in config_object.tokens.items():
for collection_name, token_collection_info in token_info.collections.items():
if collection_name not in instance_config.hashed_tokens:
instance_config.hashed_tokens[collection_name] = {}
if token_info.hashed:
token_id, _ = get_token_parts(token_name)
if token_id == '':
msg = 'empty ID in hashed token'
raise ConfigError(msg)
if token_id in instance_config.hashed_tokens[collection_name]:
msg = f'duplicated ID in hashed token: {token_id}'
raise ConfigError(msg)
instance_config.hashed_tokens[collection_name][token_id] = token_name
if collection_name not in instance_config.tokens:
instance_config.tokens[collection_name] = {}
permissions = get_permissions(token_collection_info.mode)
instance_config.tokens[collection_name][token_name] = {
'permissions': permissions,
'user_id': token_info.user_id,
'incoming_label': token_collection_info.incoming_label,
}
# There is only a token store if the token has incoming read- or
# incoming write-permissions. If a token store exists, we ensure
# that an incoming path is set and an incoming label exists.
if permissions.incoming_read or permissions.incoming_write:
# Check that the incoming label is set for a token that has
# access rights to incoming records.
if not token_collection_info.incoming_label:
msg = f'Token `{token_name}` with mode {token_collection_info.mode} must not have an empty `incoming_label`'
raise ConfigError(msg)
if any(c in token_collection_info.incoming_label for c in ('\\', '/')):
msg = (
f'Incoming label for token `...` on collection '
f'`{collection_name}` must not contain slashes or '
f'backslashes: `{token_collection_info.incoming_label}`'
)
raise ConfigError(msg)
if collection_name not in instance_config.incoming:
msg = (
'Incoming location not defined for collection '
f'`{collection_name}`, which has at least one token '
f'with write access'
)
raise ConfigError(msg)
# Create all incoming zones
incoming_location = (
store_path
/ instance_config.collections[collection_name].incoming
/ token_collection_info.incoming_label
)
incoming_location.mkdir(parents=True, exist_ok=True)
# Check that default tokens are defined
for collection_name, collection_info in config_object.collections.items():
if collection_info.default_token not in instance_config.tokens[collection_name]:
msg = f'Unknown default token: `{collection_info.default_token}`'
raise ConfigError(msg)
# Check that config authentication source is present if tokens are defined
# in the config file
for collection_name, _ in config_object.collections.items():
config_tokens = instance_config.tokens.get(collection_name, {})
if config_tokens:
if not any(
isinstance(auth_source, ConfigAuthenticationSource)
for auth_source in instance_config.auth_providers[collection_name]
):
msg = (
f'Collection `{collection_name}` has tokens defined in '
'configuration file, but no `config` authentication source'
)
raise ConfigError(msg)
# Check that hashed plain tokens do not clash with hashed tokens:
hashed_plain_tokens = {
hash_token(token)
for collection in instance_config.collections
for token in instance_config.tokens[collection]
if '-' in token
}
hashed_tokens = {
value
for token_dict in instance_config.hashed_tokens.values()
for value in token_dict.values()
}
if hashed_plain_tokens.intersection(hashed_tokens):
msg = 'plain tokens clash with hashed tokens'
raise ConfigError(msg)
# Check tags
for collection_name, collection_info in config_object.collections.items():
module = instance_config.model_info[collection_name][0]
try:
resolve_curie(module, collection_info.submission_tags.submission_time_tag)
except CurieResolutionError as e:
raise ConfigError(str(e)) from e
return instance_config
def get_backend_and_extension(backend_type: str) -> tuple[str, str]:
elements = backend_type.split('+')
return (elements[0], elements[1]) if len(elements) > 1 else (elements[0], '')
def get_zone(
instance_config: InstanceConfig,
collection: str,
token: str,
) -> str | None:
"""Get the zone for the given collection and token."""
if collection not in instance_config.zones:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f'No incoming zone defined for collection: {collection}',
)
if token not in instance_config.zones[collection]:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f'Missing incoming_label for given token in collection: {collection}',
)
return instance_config.zones[collection][token]
def get_conversion_objects_for_collection(
instance_config: InstanceConfig,
collection_name: str,
) -> dict:
"""Get the conversion objects for the given collection."""
check_collection(instance_config, collection_name)
return instance_config.conversion_objects[instance_config.schemas[collection_name]]
def get_model_info_for_collection(
instance_config: InstanceConfig,
collection_name: str,
) -> tuple[types.ModuleType, dict[str, Any], str]:
check_collection(instance_config, collection_name)
return instance_config.model_info[collection_name]