dump-things-server/dump_things_service/abstract_config.py
Christian Monch 0225d8647b
Some checks failed
Test execution / Test-all (push) Failing after 1m34s
[temp] adjust tests
2026-05-06 17:22:44 +02:00

421 lines
12 KiB
Python

import enum
import hashlib
import logging
from pathlib import (
Path,
PurePosixPath,
)
from typing import (
Iterable,
Literal,
)
from fastapi import HTTPException
from pydantic import (
BaseModel,
ConfigDict,
Field,
)
from dump_things_service import HTTP_404_NOT_FOUND
from dump_things_service.audit.gitaudit import GitAuditBackend
from dump_things_service.backends.record_dir import (
_RecordDirStore,
RecordDirStore,
)
from dump_things_service.mapping_functions import (
MappingMethod,
mapping_functions,
)
logger = logging.getLogger('dump_things_service')
g_abstract_configuration = None
class StrictModel(BaseModel):
model_config = ConfigDict(
extra='forbid',
use_enum_values=True,
)
class ConfigAuthSpec(BaseModel):
type: Literal['config'] = 'config'
class ForgejoAuthSpec(BaseModel):
type: Literal['forgejo']
url: str
organization: str
team: str
label_type: Literal['team', 'user']
repository: str | None = None
class TagSpec(BaseModel):
submitter_id_tag: str = 'http://purl.obolibrary.org/obo/NCIT_C54269'
submission_time_tag: str = 'http://semanticscience.org/resource/SIO_001083'
class RecordDirBackendConfig(StrictModel):
model_config = ConfigDict(use_enum_values=True)
type: Literal['record_dir', 'record_dir+stl']
mapping_method: str = MappingMethod.digest_md5.value
class SQLiteBackendConfig(StrictModel):
type: Literal['sqlite', 'sqlite+stl']
class GitAuditBackendConfig(StrictModel):
type: Literal['gitaudit']
path: Path
auto_flush_timeout: int = 60
class CollectionConfig(BaseModel):
model_config = ConfigDict(extra='forbid', use_enum_values=True)
name: str
default_token: str
curated: PurePosixPath
schema: str
incoming: PurePosixPath | None = None
backend: RecordDirBackendConfig | SQLiteBackendConfig = RecordDirBackendConfig(type='record_dir+stl')
auth_sources: list[ForgejoAuthSpec | ConfigAuthSpec] = [ConfigAuthSpec()]
audit_backends: list[GitAuditBackendConfig] = []
submission_tags: TagSpec = TagSpec()
use_classes: list[str] = []
ignore_classes: list[str] = []
class RecordDirConfigFileContent(BaseModel):
model_config = ConfigDict(extra='forbid')
type: Literal['records']
version: Literal[1]
schema: str
format: Literal['yaml']
idfx: MappingMethod
class TokenModes(enum.Enum):
READ_CURATED = 'READ_CURATED'
READ_COLLECTION = 'READ_COLLECTION'
WRITE_COLLECTION = 'WRITE_COLLECTION'
READ_SUBMISSIONS = 'READ_SUBMISSIONS'
WRITE_SUBMISSIONS = 'WRITE_SUBMISSIONS'
SUBMIT = 'SUBMIT'
SUBMIT_ONLY = 'SUBMIT_ONLY'
NOTHING = 'NOTHING'
CURATOR = 'CURATOR'
ADMIN = 'ADMIN'
class TokenPermission(BaseModel):
curated_read: bool = False
incoming_read: bool = False
incoming_write: bool = False
curated_write: bool = False
zones_access: bool = False
admin: bool = False
class TokenCollectionConfig(StrictModel):
model_config = ConfigDict(extra='forbid', use_enum_values=True)
mode: TokenModes
incoming_label: str = Field(strict=True)
class TokenConfig(StrictModel):
user_id: str
collections: dict[str, TokenCollectionConfig]
hashed: bool = False
representation: str = ''
dump_things_config_iri = 'dump_things:config'
dump_things_private_path = Path('__dump_things__')
config_backend_path = dump_things_private_path / 'config_store'
config_audit_path = dump_things_private_path / 'config_audit'
config_backend = None
config_audit = None
class Configuration(BaseModel):
collections: dict[str, CollectionConfig] = {}
tokens: dict[str, TokenConfig] = {}
pid: str = dump_things_config_iri
mode_mapping = {
TokenModes.READ_CURATED: TokenPermission(curated_read=True),
TokenModes.READ_COLLECTION: TokenPermission(
curated_read=True,
incoming_read=True,
),
TokenModes.WRITE_COLLECTION: TokenPermission(
curated_read=True,
incoming_read=True,
incoming_write=True,
),
TokenModes.READ_SUBMISSIONS: TokenPermission(incoming_read=True),
TokenModes.WRITE_SUBMISSIONS: TokenPermission(
incoming_read=True,
incoming_write=True,
),
TokenModes.SUBMIT: TokenPermission(curated_read=True, incoming_write=True),
TokenModes.SUBMIT_ONLY: TokenPermission(incoming_write=True),
TokenModes.NOTHING: TokenPermission(),
TokenModes.CURATOR: TokenPermission(
curated_read=True,
incoming_read=True,
incoming_write=True,
curated_write=True,
zones_access=True,
),
TokenModes.ADMIN: TokenPermission(
curated_read=True,
incoming_read=True,
incoming_write=True,
curated_write=True,
zones_access=True,
admin=True,
)
}
def get_permissions(mode: str) -> TokenPermission:
return mode_mapping[TokenModes(mode)]
def get_config_backends(
store_path: Path,
) -> tuple[_RecordDirStore, GitAuditBackend]:
global config_audit
global config_backend
config_path = store_path / config_backend_path
if not config_path.exists():
config_path.mkdir(parents=True)
if config_backend is None:
config_backend = RecordDirStore(
config_path,
mapping_functions[MappingMethod.digest_md5],
'yaml'
)
audit_path = store_path / config_audit_path
if not audit_path.exists():
audit_path.mkdir(parents=True)
if config_audit is None:
config_audit = GitAuditBackend(audit_path)
return config_backend, config_audit
def read_config(
store_path: Path,
) -> Configuration:
global g_abstract_configuration
if not g_abstract_configuration:
config_backend, _ = get_config_backends(store_path)
record_info = config_backend.get_record_by_iri(dump_things_config_iri)
g_abstract_configuration = (
Configuration(**(record_info.json_object))
if record_info
else Configuration()
)
return g_abstract_configuration
def store_config(
store_path,
config: Configuration,
):
global g_abstract_configuration
config_backend, audit_backend = get_config_backends(store_path)
json_object = config.model_dump(mode='json', exclude_none=True)
json_object['pid'] = dump_things_config_iri
config_backend.add_record(
iri=dump_things_config_iri,
class_name='DumpThingsConfig',
json_object=json_object
)
audit_backend.add_record(
record=json_object,
committer_id='__dump_things_server__',
)
g_abstract_configuration = config
def tokens_for_collection(
config: Configuration,
collection: str,
) -> Iterable[TokenConfig]:
yield from (
token
for token_name, token in config.tokens.items()
if collection in token.collections
)
def get_zone(
configuration: Configuration,
collection: str,
token: str,
) -> str | None:
"""Get the zone for the given collection and token."""
check_collection(configuration, collection)
assert False
if collection not in configuration.collections:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f'No incoming zone defined for collection: {collection}',
)
if token not in instance_config.zones[collection]:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f'Missing incoming_label for given token in collection: {collection}',
)
return instance_config.zones[collection][token]
def check_collection(
abstract_config: Configuration,
collection: str,
):
if collection not in abstract_config.collections:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"No such collection: '{collection}'.",
)
def check_label(
abstract_config: Configuration,
collection: str,
label: str,
):
"""Check that a label exists in a collection configuration or on disk"""
if (
label not in get_config_labels(abstract_config, collection)
and label not in get_on_disk_labels(abstract_config, collection)
):
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"No incoming label: '{label}' in collection: '{collection}'.",
)
def get_config_labels(
abstract_config: Configuration,
collection: str,
) -> set[str]:
check_collection(abstract_config, collection)
return {
token.collections[collection].incoming_label
for token in tokens_for_collection(abstract_config, collection)
if token.collections[collection].incoming_label
}
def get_default_token_name(
abstract_config: Configuration,
collection: str
) -> str:
check_collection(abstract_config, collection)
return abstract_config.collections[collection].default_token
def get_token_info_by_representation(
abstract_config: Configuration,
token_representation: str,
) -> tuple[str, TokenConfig] | None:
"""Get the name of the token given in `token_representation`"""
hashed_representation = hashlib.sha1(token_representation.encode()).hexdigest()
for token_name, token_config in abstract_config.tokens.items():
if token_config.hashed:
compare_representation = hashed_representation
else:
compare_representation = token_representation
if compare_representation == token_config.representation:
return token_name, token_config
return None
def get_token_config_by_name(
abstract_config: Configuration,
token_name: str,
) -> TokenConfig | None:
return abstract_config.tokens.get(token_name)
def get_token_infos_for_collection(
abstract_config: Configuration,
collection_name: str,
) -> Iterable[tuple[str, TokenConfig, TokenCollectionConfig]]:
yield from {
(token_name, token_config, token_collection_config)
for token_name, token_config in abstract_config.tokens.items()
for token_collection_config in token_config.collections.get(collection_name)
if token_config is not None
}
def get_token_config_for_representation_and_collection(
abstract_config: Configuration,
collection_name: str,
token_representation: str,
) -> tuple[str, TokenConfig, TokenCollectionConfig] | None:
token_info = get_token_info_by_representation(
abstract_config=abstract_config,
token_representation=token_representation,
)
if token_info:
token_name, token_config = token_info
if collection_name in token_config.collections:
return token_name, token_config, token_config.collections[collection_name]
return None
def get_collection_config_by_name(
abstract_config: Configuration,
collection_name: str,
) -> CollectionConfig:
collection_config = abstract_config.collections.get(collection_name)
if not collection_config:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"No such collection: '{collection_name}'",
)
return collection_config
def get_default_token_config(
abstract_config: Configuration,
collection: str,
) -> TokenConfig:
default_token_name = get_collection_config_by_name(
abstract_config,
collection,
).default_token
return get_token_config_by_name(abstract_config, default_token_name)
def get_mapping_function(record_dir_backend_config: RecordDirBackendConfig):
return mapping_functions[MappingMethod(record_dir_backend_config.mapping_method)]
def get_backend_and_extension(backend_type: str) -> tuple[str, str]:
elements = backend_type.split('+')
return (elements[0], elements[1]) if len(elements) > 1 else (elements[0], '')