dump-things-server/dump_things_service/abstract_config.py
2026-06-12 12:41:54 +02:00

497 lines
14 KiB
Python

import enum
import hashlib
import logging
from functools import partial
from pathlib import (
Path,
PurePosixPath,
)
from typing import (
Callable,
Iterable,
Literal,
cast,
)
from fastapi import HTTPException
from pydantic import (
BaseModel,
ConfigDict,
Field, ValidationError,
)
from yaml.scanner import ScannerError
from dump_things_service import (
HTTP_404_NOT_FOUND,
dump_things_private_collection_name,
)
from dump_things_service.audit.gitaudit import GitAuditBackend
from dump_things_service.backends.record_dir import (
_RecordDirStore,
RecordDirStore,
)
from dump_things_service.exceptions import ConfigError
logger = logging.getLogger('dump_things_service')
g_abstract_configuration = None
dump_things_config_iri = 'dump_things:config'
dump_things_private_path = Path(dump_things_private_collection_name)
config_backend_path = dump_things_private_path / 'config_store'
config_audit_path = dump_things_private_path / 'config_audit'
config_backend = None
config_audit = None
class StrictModel(BaseModel):
model_config = ConfigDict(
extra='forbid',
use_enum_values=True,
)
class ConfigAuthSpec(BaseModel):
type: Literal['config'] = 'config'
class ForgejoAuthSpec(BaseModel):
type: Literal['forgejo']
url: str
organization: str
team: str
label_type: Literal['team', 'user']
instance_id: str | None = None
repository: str | None = None
class TagSpec(BaseModel):
submitter_id_tag: str = 'http://purl.obolibrary.org/obo/NCIT_C54269'
submission_time_tag: str = 'http://semanticscience.org/resource/SIO_001083'
class MappingMethod(enum.Enum):
digest_md5 = 'digest-md5'
digest_md5_p3 = 'digest-md5-p3'
digest_md5_p3_p3 = 'digest-md5-p3-p3'
digest_sha1 = 'digest-sha1'
digest_sha1_p3 = 'digest-sha1-p3'
digest_sha1_p3_p3 = 'digest-sha1-p3-p3'
after_last_colon = 'after-last-colon'
class RecordDirBackendConfig(StrictModel):
model_config = ConfigDict(use_enum_values=True)
type: Literal['record_dir', 'record_dir+stl']
mapping_method: str = MappingMethod.digest_md5.value
class SQLiteBackendConfig(StrictModel):
type: Literal['sqlite', 'sqlite+stl']
class GitAuditBackendConfig(StrictModel):
type: Literal['gitaudit']
path: Path
auto_flush_timeout: int = 60
class CollectionConfig(BaseModel):
model_config = ConfigDict(extra='forbid', use_enum_values=True)
default_token: str
curated: PurePosixPath
schema_location: str = Field(alias='schema')
incoming: PurePosixPath | None = None
backend: RecordDirBackendConfig | SQLiteBackendConfig = RecordDirBackendConfig(type='record_dir+stl')
auth_sources: list[ForgejoAuthSpec | ConfigAuthSpec] = [ConfigAuthSpec()]
audit_backends: list[GitAuditBackendConfig] = []
submission_tags: TagSpec = TagSpec()
use_classes: list[str] = []
ignore_classes: list[str] = []
class RecordDirConfigFileContent(BaseModel):
model_config = ConfigDict(extra='forbid')
type: Literal['records']
version: Literal[1]
schema_location: str = Field(alias='schema')
format: Literal['yaml']
idfx: MappingMethod
class TokenModes(enum.Enum):
READ_CURATED = 'READ_CURATED'
READ_COLLECTION = 'READ_COLLECTION'
WRITE_COLLECTION = 'WRITE_COLLECTION'
READ_SUBMISSIONS = 'READ_SUBMISSIONS'
WRITE_SUBMISSIONS = 'WRITE_SUBMISSIONS'
SUBMIT = 'SUBMIT'
SUBMIT_ONLY = 'SUBMIT_ONLY'
NOTHING = 'NOTHING'
CURATOR = 'CURATOR'
class TokenPermission(BaseModel):
curated_read: bool = False
incoming_read: bool = False
incoming_write: bool = False
curated_write: bool = False
zones_access: bool = False
class TokenCollectionConfig(StrictModel):
model_config = ConfigDict(extra='forbid', use_enum_values=True)
mode: TokenModes
incoming_label: str = ''
class TokenConfig(StrictModel):
user_id: str
collections: dict[str, TokenCollectionConfig]
hashed: bool = False
representation: str = ''
class AdminTokenConfig(StrictModel):
representation: str
class Configuration(StrictModel):
collections: dict[str, CollectionConfig] = {}
tokens: dict[str, TokenConfig] = {}
admin_tokens: dict[str, AdminTokenConfig] = {}
pid: str = dump_things_config_iri
mode_mapping = {
TokenModes.READ_CURATED: TokenPermission(curated_read=True),
TokenModes.READ_COLLECTION: TokenPermission(
curated_read=True,
incoming_read=True,
),
TokenModes.WRITE_COLLECTION: TokenPermission(
curated_read=True,
incoming_read=True,
incoming_write=True,
),
TokenModes.READ_SUBMISSIONS: TokenPermission(incoming_read=True),
TokenModes.WRITE_SUBMISSIONS: TokenPermission(
incoming_read=True,
incoming_write=True,
),
TokenModes.SUBMIT: TokenPermission(curated_read=True, incoming_write=True),
TokenModes.SUBMIT_ONLY: TokenPermission(incoming_write=True),
TokenModes.NOTHING: TokenPermission(),
TokenModes.CURATOR: TokenPermission(
curated_read=True,
incoming_read=True,
incoming_write=True,
curated_write=True,
zones_access=True,
),
}
def get_token_permissions(mode: str) -> TokenPermission:
return mode_mapping[TokenModes(mode)]
def get_config_backends(
store_path: Path,
) -> tuple[_RecordDirStore, GitAuditBackend]:
global config_audit
global config_backend
config_path = store_path / config_backend_path
if not config_path.exists():
config_path.mkdir(parents=True)
if config_backend is None:
config_backend = RecordDirStore(
config_path,
mapping_functions[MappingMethod.digest_md5],
'yaml'
)
audit_path = store_path / config_audit_path
if not audit_path.exists():
audit_path.mkdir(parents=True)
if config_audit is None:
config_audit = GitAuditBackend(audit_path)
return config_backend, config_audit
def read_config(
store_path: Path,
force_reload: bool = False,
) -> Configuration:
global g_abstract_configuration
if not g_abstract_configuration or force_reload:
config_backend, _ = get_config_backends(store_path)
try:
record_info = config_backend.get_record_by_iri(dump_things_config_iri)
except ScannerError as sce:
msg = f'Configuration at {config_backend.root} not readable: {sce}'
raise ConfigError(msg) from sce
try:
g_abstract_configuration = (
Configuration(**(record_info.json_object))
if record_info
else Configuration()
)
except ValidationError as ve:
msg = f'Faulty configuration at {config_backend.root}: {ve}'
raise ConfigError(msg) from ve
return g_abstract_configuration
def get_config() -> Configuration:
global g_abstract_configuration
if not g_abstract_configuration:
msg = 'Configuration not yet loaded'
raise RuntimeError(msg)
return cast(Configuration, g_abstract_configuration)
def store_config(
store_path,
config: Configuration,
):
global g_abstract_configuration
config_backend, audit_backend = get_config_backends(store_path)
json_object = config.model_dump(mode='json', exclude_none=True, by_alias=True)
json_object['pid'] = dump_things_config_iri
config_backend.add_record(
iri=dump_things_config_iri,
class_name='DumpThingsConfig',
json_object=json_object
)
audit_backend.add_record(
record=json_object,
committer_id='__dump_things_server__',
)
g_abstract_configuration = config
def tokens_for_collection(
config: Configuration,
collection: str,
) -> Iterable[TokenConfig]:
yield from (
token
for token_name, token in config.tokens.items()
if collection in token.collections
)
def check_collection(
abstract_config: Configuration,
collection: str,
):
if collection not in abstract_config.collections:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"No such collection: '{collection}'.",
)
def check_label(
store_path: Path,
abstract_config: Configuration,
collection: str,
label: str,
):
from dump_things_service.utils import get_on_disk_labels
"""Check that a label exists in a collection configuration or on disk"""
if (
label not in get_config_labels(abstract_config, collection)
and label not in get_on_disk_labels(store_path, abstract_config, collection)
):
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"No incoming label: '{label}' in collection: '{collection}'.",
)
def get_config_labels(
abstract_config: Configuration,
collection: str,
) -> set[str]:
check_collection(abstract_config, collection)
return {
token.collections[collection].incoming_label
for token in tokens_for_collection(abstract_config, collection)
if token.collections[collection].incoming_label
}
def get_default_token_name(
abstract_config: Configuration,
collection: str
) -> str:
check_collection(abstract_config, collection)
return abstract_config.collections[collection].default_token
def get_token_info_by_representation(
abstract_config: Configuration,
token_representation: str,
) -> tuple[str, TokenConfig] | None:
"""Get the name of the token given in `token_representation`"""
hashed_representation = hash_token_representation(token_representation)
for token_name, token_config in abstract_config.tokens.items():
if token_config.hashed:
compare_representation = hashed_representation
else:
compare_representation = token_representation
if compare_representation == token_config.representation:
return token_name, token_config
return None
def hash_token_representation(
token_representation: str,
) -> str:
return hashlib.sha256(token_representation.encode()).hexdigest()
def get_token_config_by_name(
abstract_config: Configuration,
token_name: str,
) -> TokenConfig | None:
return abstract_config.tokens.get(token_name)
def get_token_infos_for_collection(
abstract_config: Configuration,
collection_name: str,
) -> Iterable[tuple[str, TokenConfig, TokenCollectionConfig]]:
yield from {
(token_name, token_config, token_collection_config)
for token_name, token_config in abstract_config.tokens.items()
for token_collection_config in token_config.collections.get(collection_name)
if token_config is not None
}
def get_token_config_for_representation_and_collection(
abstract_config: Configuration,
collection_name: str,
token_representation: str,
) -> tuple[str, TokenConfig, TokenCollectionConfig] | None:
token_info = get_token_info_by_representation(
abstract_config=abstract_config,
token_representation=token_representation,
)
if token_info:
token_name, token_config = token_info
if collection_name in token_config.collections:
return token_name, token_config, token_config.collections[collection_name]
return None
def get_collection_config_by_name(
abstract_config: Configuration,
collection_name: str,
) -> CollectionConfig:
collection_config = abstract_config.collections.get(collection_name)
if not collection_config:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"No such collection: '{collection_name}'",
)
return collection_config
def get_default_token_config(
abstract_config: Configuration,
collection: str,
) -> TokenConfig | None:
default_token_name = get_collection_config_by_name(
abstract_config,
collection,
).default_token
return get_token_config_by_name(abstract_config, default_token_name)
def get_default_token_representation(
abstract_config: Configuration,
collection: str,
) -> str | None:
default_token_config = get_default_token_config(
abstract_config,
collection,
)
return default_token_config.representation if default_token_config else None
def get_mapping_function(record_dir_backend_config: RecordDirBackendConfig):
return mapping_functions[MappingMethod(record_dir_backend_config.mapping_method)]
def get_backend_and_extension(backend_type: str) -> tuple[str, str]:
elements = backend_type.split('+')
return (elements[0], elements[1]) if len(elements) > 1 else (elements[0], '')
def get_hex_digest(hasher: Callable, data: str) -> str:
hash_context = hasher(data.encode())
return hash_context.hexdigest()
def mapping_digest_p3(
hasher: Callable,
pid: str,
suffix: str,
) -> Path:
hex_digest = get_hex_digest(hasher, pid)
return Path(hex_digest[:3]) / (hex_digest[3:] + '.' + suffix)
def mapping_digest_p3_p3(
hasher: Callable,
pid: str,
suffix: str,
) -> Path:
hex_digest = get_hex_digest(hasher, pid)
return Path(hex_digest[:3]) / hex_digest[3:6] / (hex_digest[6:] + '.' + suffix)
def mapping_digest(hasher: Callable, pid: str, suffix: str) -> Path:
hex_digest = get_hex_digest(hasher, pid)
return Path(hex_digest + '.' + suffix)
def mapping_after_last_colon(pid: str, suffix: str) -> Path:
plain_result = pid.split(':')[-1]
# Escape any colons and slashes in the pid
escaped_result = (
plain_result.replace('_', '__').replace('/', '_s').replace('.', '_d')
)
return Path(escaped_result + '.' + suffix)
mapping_functions = {
MappingMethod.digest_md5: partial(mapping_digest, hashlib.md5),
MappingMethod.digest_md5_p3: partial(mapping_digest_p3, hashlib.md5),
MappingMethod.digest_md5_p3_p3: partial(mapping_digest_p3_p3, hashlib.md5),
MappingMethod.digest_sha1: partial(mapping_digest, hashlib.sha1),
MappingMethod.digest_sha1_p3: partial(mapping_digest_p3, hashlib.sha1),
MappingMethod.digest_sha1_p3_p3: partial(mapping_digest_p3_p3, hashlib.sha1),
MappingMethod.after_last_colon: mapping_after_last_colon,
}
def get_mapping_function_by_name(mapping_function_name: str) -> Callable:
return mapping_functions[MappingMethod(mapping_function_name)]