421 lines
12 KiB
Python
421 lines
12 KiB
Python
import enum
|
|
import hashlib
|
|
import logging
|
|
from pathlib import (
|
|
Path,
|
|
PurePosixPath,
|
|
)
|
|
from typing import (
|
|
Iterable,
|
|
Literal,
|
|
)
|
|
|
|
from fastapi import HTTPException
|
|
from pydantic import (
|
|
BaseModel,
|
|
ConfigDict,
|
|
Field,
|
|
)
|
|
|
|
from dump_things_service import HTTP_404_NOT_FOUND
|
|
from dump_things_service.audit.gitaudit import GitAuditBackend
|
|
from dump_things_service.backends.record_dir import (
|
|
_RecordDirStore,
|
|
RecordDirStore,
|
|
)
|
|
from dump_things_service.mapping_functions import (
|
|
MappingMethod,
|
|
mapping_functions,
|
|
)
|
|
|
|
|
|
logger = logging.getLogger('dump_things_service')
|
|
|
|
g_abstract_configuration = None
|
|
|
|
|
|
class StrictModel(BaseModel):
|
|
model_config = ConfigDict(
|
|
extra='forbid',
|
|
use_enum_values=True,
|
|
)
|
|
|
|
|
|
class ConfigAuthSpec(BaseModel):
|
|
type: Literal['config'] = 'config'
|
|
|
|
|
|
class ForgejoAuthSpec(BaseModel):
|
|
type: Literal['forgejo']
|
|
url: str
|
|
organization: str
|
|
team: str
|
|
label_type: Literal['team', 'user']
|
|
repository: str | None = None
|
|
|
|
|
|
class TagSpec(BaseModel):
|
|
submitter_id_tag: str = 'http://purl.obolibrary.org/obo/NCIT_C54269'
|
|
submission_time_tag: str = 'http://semanticscience.org/resource/SIO_001083'
|
|
|
|
|
|
class RecordDirBackendConfig(StrictModel):
|
|
model_config = ConfigDict(use_enum_values=True)
|
|
type: Literal['record_dir', 'record_dir+stl']
|
|
mapping_method: str = MappingMethod.digest_md5.value
|
|
|
|
|
|
class SQLiteBackendConfig(StrictModel):
|
|
type: Literal['sqlite', 'sqlite+stl']
|
|
|
|
|
|
class GitAuditBackendConfig(StrictModel):
|
|
type: Literal['gitaudit']
|
|
path: Path
|
|
auto_flush_timeout: int = 60
|
|
|
|
|
|
class CollectionConfig(BaseModel):
|
|
model_config = ConfigDict(extra='forbid', use_enum_values=True)
|
|
name: str
|
|
default_token: str
|
|
curated: PurePosixPath
|
|
schema: str
|
|
incoming: PurePosixPath | None = None
|
|
backend: RecordDirBackendConfig | SQLiteBackendConfig = RecordDirBackendConfig(type='record_dir+stl')
|
|
auth_sources: list[ForgejoAuthSpec | ConfigAuthSpec] = [ConfigAuthSpec()]
|
|
audit_backends: list[GitAuditBackendConfig] = []
|
|
submission_tags: TagSpec = TagSpec()
|
|
use_classes: list[str] = []
|
|
ignore_classes: list[str] = []
|
|
|
|
|
|
class RecordDirConfigFileContent(BaseModel):
|
|
model_config = ConfigDict(extra='forbid')
|
|
type: Literal['records']
|
|
version: Literal[1]
|
|
schema: str
|
|
format: Literal['yaml']
|
|
idfx: MappingMethod
|
|
|
|
|
|
class TokenModes(enum.Enum):
|
|
READ_CURATED = 'READ_CURATED'
|
|
READ_COLLECTION = 'READ_COLLECTION'
|
|
WRITE_COLLECTION = 'WRITE_COLLECTION'
|
|
READ_SUBMISSIONS = 'READ_SUBMISSIONS'
|
|
WRITE_SUBMISSIONS = 'WRITE_SUBMISSIONS'
|
|
SUBMIT = 'SUBMIT'
|
|
SUBMIT_ONLY = 'SUBMIT_ONLY'
|
|
NOTHING = 'NOTHING'
|
|
CURATOR = 'CURATOR'
|
|
ADMIN = 'ADMIN'
|
|
|
|
|
|
class TokenPermission(BaseModel):
|
|
curated_read: bool = False
|
|
incoming_read: bool = False
|
|
incoming_write: bool = False
|
|
curated_write: bool = False
|
|
zones_access: bool = False
|
|
admin: bool = False
|
|
|
|
|
|
class TokenCollectionConfig(StrictModel):
|
|
model_config = ConfigDict(extra='forbid', use_enum_values=True)
|
|
mode: TokenModes
|
|
incoming_label: str = Field(strict=True)
|
|
|
|
|
|
class TokenConfig(StrictModel):
|
|
user_id: str
|
|
collections: dict[str, TokenCollectionConfig]
|
|
hashed: bool = False
|
|
representation: str = ''
|
|
|
|
|
|
dump_things_config_iri = 'dump_things:config'
|
|
dump_things_private_path = Path('__dump_things__')
|
|
config_backend_path = dump_things_private_path / 'config_store'
|
|
config_audit_path = dump_things_private_path / 'config_audit'
|
|
config_backend = None
|
|
config_audit = None
|
|
|
|
|
|
class Configuration(BaseModel):
|
|
collections: dict[str, CollectionConfig] = {}
|
|
tokens: dict[str, TokenConfig] = {}
|
|
pid: str = dump_things_config_iri
|
|
|
|
|
|
mode_mapping = {
|
|
TokenModes.READ_CURATED: TokenPermission(curated_read=True),
|
|
TokenModes.READ_COLLECTION: TokenPermission(
|
|
curated_read=True,
|
|
incoming_read=True,
|
|
),
|
|
TokenModes.WRITE_COLLECTION: TokenPermission(
|
|
curated_read=True,
|
|
incoming_read=True,
|
|
incoming_write=True,
|
|
),
|
|
TokenModes.READ_SUBMISSIONS: TokenPermission(incoming_read=True),
|
|
TokenModes.WRITE_SUBMISSIONS: TokenPermission(
|
|
incoming_read=True,
|
|
incoming_write=True,
|
|
),
|
|
TokenModes.SUBMIT: TokenPermission(curated_read=True, incoming_write=True),
|
|
TokenModes.SUBMIT_ONLY: TokenPermission(incoming_write=True),
|
|
TokenModes.NOTHING: TokenPermission(),
|
|
TokenModes.CURATOR: TokenPermission(
|
|
curated_read=True,
|
|
incoming_read=True,
|
|
incoming_write=True,
|
|
curated_write=True,
|
|
zones_access=True,
|
|
),
|
|
TokenModes.ADMIN: TokenPermission(
|
|
curated_read=True,
|
|
incoming_read=True,
|
|
incoming_write=True,
|
|
curated_write=True,
|
|
zones_access=True,
|
|
admin=True,
|
|
)
|
|
}
|
|
|
|
|
|
def get_permissions(mode: str) -> TokenPermission:
|
|
return mode_mapping[TokenModes(mode)]
|
|
|
|
|
|
def get_config_backends(
|
|
store_path: Path,
|
|
) -> tuple[_RecordDirStore, GitAuditBackend]:
|
|
global config_audit
|
|
global config_backend
|
|
|
|
config_path = store_path / config_backend_path
|
|
if not config_path.exists():
|
|
config_path.mkdir(parents=True)
|
|
|
|
if config_backend is None:
|
|
config_backend = RecordDirStore(
|
|
config_path,
|
|
mapping_functions[MappingMethod.digest_md5],
|
|
'yaml'
|
|
)
|
|
|
|
audit_path = store_path / config_audit_path
|
|
if not audit_path.exists():
|
|
audit_path.mkdir(parents=True)
|
|
|
|
if config_audit is None:
|
|
config_audit = GitAuditBackend(audit_path)
|
|
return config_backend, config_audit
|
|
|
|
|
|
def read_config(
|
|
store_path: Path,
|
|
) -> Configuration:
|
|
global g_abstract_configuration
|
|
|
|
if not g_abstract_configuration:
|
|
config_backend, _ = get_config_backends(store_path)
|
|
record_info = config_backend.get_record_by_iri(dump_things_config_iri)
|
|
g_abstract_configuration = (
|
|
Configuration(**(record_info.json_object))
|
|
if record_info
|
|
else Configuration()
|
|
)
|
|
return g_abstract_configuration
|
|
|
|
|
|
def store_config(
|
|
store_path,
|
|
config: Configuration,
|
|
):
|
|
global g_abstract_configuration
|
|
|
|
config_backend, audit_backend = get_config_backends(store_path)
|
|
json_object = config.model_dump(mode='json', exclude_none=True)
|
|
json_object['pid'] = dump_things_config_iri
|
|
config_backend.add_record(
|
|
iri=dump_things_config_iri,
|
|
class_name='DumpThingsConfig',
|
|
json_object=json_object
|
|
)
|
|
audit_backend.add_record(
|
|
record=json_object,
|
|
committer_id='__dump_things_server__',
|
|
)
|
|
g_abstract_configuration = config
|
|
|
|
|
|
def tokens_for_collection(
|
|
config: Configuration,
|
|
collection: str,
|
|
) -> Iterable[TokenConfig]:
|
|
yield from (
|
|
token
|
|
for token_name, token in config.tokens.items()
|
|
if collection in token.collections
|
|
)
|
|
|
|
|
|
def get_zone(
|
|
configuration: Configuration,
|
|
collection: str,
|
|
token: str,
|
|
) -> str | None:
|
|
"""Get the zone for the given collection and token."""
|
|
check_collection(configuration, collection)
|
|
|
|
assert False
|
|
if collection not in configuration.collections:
|
|
raise HTTPException(
|
|
status_code=HTTP_404_NOT_FOUND,
|
|
detail=f'No incoming zone defined for collection: {collection}',
|
|
)
|
|
if token not in instance_config.zones[collection]:
|
|
raise HTTPException(
|
|
status_code=HTTP_404_NOT_FOUND,
|
|
detail=f'Missing incoming_label for given token in collection: {collection}',
|
|
)
|
|
return instance_config.zones[collection][token]
|
|
|
|
|
|
def check_collection(
|
|
abstract_config: Configuration,
|
|
collection: str,
|
|
):
|
|
if collection not in abstract_config.collections:
|
|
raise HTTPException(
|
|
status_code=HTTP_404_NOT_FOUND,
|
|
detail=f"No such collection: '{collection}'.",
|
|
)
|
|
|
|
|
|
def check_label(
|
|
abstract_config: Configuration,
|
|
collection: str,
|
|
label: str,
|
|
):
|
|
"""Check that a label exists in a collection configuration or on disk"""
|
|
if (
|
|
label not in get_config_labels(abstract_config, collection)
|
|
and label not in get_on_disk_labels(abstract_config, collection)
|
|
):
|
|
raise HTTPException(
|
|
status_code=HTTP_404_NOT_FOUND,
|
|
detail=f"No incoming label: '{label}' in collection: '{collection}'.",
|
|
)
|
|
|
|
|
|
def get_config_labels(
|
|
abstract_config: Configuration,
|
|
collection: str,
|
|
) -> set[str]:
|
|
check_collection(abstract_config, collection)
|
|
return {
|
|
token.collections[collection].incoming_label
|
|
for token in tokens_for_collection(abstract_config, collection)
|
|
if token.collections[collection].incoming_label
|
|
}
|
|
|
|
|
|
def get_default_token_name(
|
|
abstract_config: Configuration,
|
|
collection: str
|
|
) -> str:
|
|
check_collection(abstract_config, collection)
|
|
return abstract_config.collections[collection].default_token
|
|
|
|
|
|
def get_token_info_by_representation(
|
|
abstract_config: Configuration,
|
|
token_representation: str,
|
|
) -> tuple[str, TokenConfig] | None:
|
|
"""Get the name of the token given in `token_representation`"""
|
|
hashed_representation = hashlib.sha1(token_representation.encode()).hexdigest()
|
|
for token_name, token_config in abstract_config.tokens.items():
|
|
if token_config.hashed:
|
|
compare_representation = hashed_representation
|
|
else:
|
|
compare_representation = token_representation
|
|
if compare_representation == token_config.representation:
|
|
return token_name, token_config
|
|
return None
|
|
|
|
|
|
def get_token_config_by_name(
|
|
abstract_config: Configuration,
|
|
token_name: str,
|
|
) -> TokenConfig | None:
|
|
return abstract_config.tokens.get(token_name)
|
|
|
|
|
|
def get_token_infos_for_collection(
|
|
abstract_config: Configuration,
|
|
collection_name: str,
|
|
) -> Iterable[tuple[str, TokenConfig, TokenCollectionConfig]]:
|
|
|
|
yield from {
|
|
(token_name, token_config, token_collection_config)
|
|
for token_name, token_config in abstract_config.tokens.items()
|
|
for token_collection_config in token_config.collections.get(collection_name)
|
|
if token_config is not None
|
|
}
|
|
|
|
|
|
def get_token_config_for_representation_and_collection(
|
|
abstract_config: Configuration,
|
|
collection_name: str,
|
|
token_representation: str,
|
|
) -> tuple[str, TokenConfig, TokenCollectionConfig] | None:
|
|
|
|
token_info = get_token_info_by_representation(
|
|
abstract_config=abstract_config,
|
|
token_representation=token_representation,
|
|
)
|
|
if token_info:
|
|
token_name, token_config = token_info
|
|
if collection_name in token_config.collections:
|
|
return token_name, token_config, token_config.collections[collection_name]
|
|
|
|
return None
|
|
|
|
|
|
def get_collection_config_by_name(
|
|
abstract_config: Configuration,
|
|
collection_name: str,
|
|
) -> CollectionConfig:
|
|
collection_config = abstract_config.collections.get(collection_name)
|
|
if not collection_config:
|
|
raise HTTPException(
|
|
status_code=HTTP_404_NOT_FOUND,
|
|
detail=f"No such collection: '{collection_name}'",
|
|
)
|
|
return collection_config
|
|
|
|
|
|
def get_default_token_config(
|
|
abstract_config: Configuration,
|
|
collection: str,
|
|
) -> TokenConfig:
|
|
|
|
default_token_name = get_collection_config_by_name(
|
|
abstract_config,
|
|
collection,
|
|
).default_token
|
|
|
|
return get_token_config_by_name(abstract_config, default_token_name)
|
|
|
|
|
|
def get_mapping_function(record_dir_backend_config: RecordDirBackendConfig):
|
|
return mapping_functions[MappingMethod(record_dir_backend_config.mapping_method)]
|
|
|
|
|
|
def get_backend_and_extension(backend_type: str) -> tuple[str, str]:
|
|
elements = backend_type.split('+')
|
|
return (elements[0], elements[1]) if len(elements) > 1 else (elements[0], '')
|