155 lines
4.5 KiB
Python
155 lines
4.5 KiB
Python
from __future__ import annotations
|
|
|
|
import dataclasses
|
|
import logging
|
|
from functools import cache
|
|
from pathlib import Path
|
|
from types import ModuleType
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
)
|
|
|
|
import yaml
|
|
from fastapi import FastAPI
|
|
from linkml_runtime import SchemaView
|
|
from pydantic import ValidationError
|
|
from yaml.scanner import ScannerError
|
|
|
|
from dump_things_service.abstract_config import (
|
|
RecordDirConfigFileContent,
|
|
MappingMethod,
|
|
mapping_functions,
|
|
)
|
|
|
|
from dump_things_service.converter import get_conversion_objects
|
|
from dump_things_service.exceptions import ConfigError
|
|
from dump_things_service.model import (
|
|
get_model_for_schema,
|
|
get_schema_model_for_schema,
|
|
get_schema_view,
|
|
)
|
|
|
|
|
|
logger = logging.getLogger('dump_things_service')
|
|
|
|
record_dir_config_file_name = '.dumpthings.yaml'
|
|
ignored_files = {'.', '..', record_dir_config_file_name}
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class PydanticModuleInfo:
|
|
module: ModuleType
|
|
module_var_name: str
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class SchemaInfo:
|
|
schema_view: SchemaView
|
|
classes: list[str]
|
|
pydantic_module_info: PydanticModuleInfo
|
|
python_module: ModuleType
|
|
conversion_objects: tuple[Any, Any]
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class InstanceStateCollectionInfo:
|
|
active_classes: set[str]
|
|
tag_info: dict[str, str]
|
|
|
|
|
|
@cache
|
|
def get_schema_info(schema_location: str):
|
|
module, classes, module_var_name = get_model_for_schema(schema_location)
|
|
return SchemaInfo(
|
|
schema_view=get_schema_view(schema_location),
|
|
classes=classes,
|
|
pydantic_module_info=PydanticModuleInfo(
|
|
module=module,
|
|
module_var_name=module_var_name,
|
|
),
|
|
python_module=get_schema_model_for_schema(schema_location),
|
|
conversion_objects=get_conversion_objects(schema_location),
|
|
)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class InstanceState:
|
|
# foundational information from command line or initialization code
|
|
store_path: Path
|
|
bootstrap_token: str | None
|
|
|
|
# Dynamically created elements
|
|
fastapi_app: FastAPI
|
|
|
|
# Influenced by maintainer interface
|
|
maintenance_mode: set = dataclasses.field(default_factory=set)
|
|
|
|
# Created based on abstract configuration
|
|
collections: dict[str, InstanceStateCollectionInfo] = dataclasses.field(default_factory=dict)
|
|
tokens: dict = dataclasses.field(default_factory=dict)
|
|
auth_sources: dict[str, list] = dataclasses.field(default_factory=dict)
|
|
audit_backends: dict[str, list] = dataclasses.field(default_factory=dict)
|
|
curated_stores: dict = dataclasses.field(default_factory=dict)
|
|
incoming_stores: dict = dataclasses.field(default_factory=dict)
|
|
schema_info: dict[str, SchemaInfo] = dataclasses.field(default_factory=dict)
|
|
validators: dict = dataclasses.field(default_factory=dict)
|
|
order_by: list[str] = dataclasses.field(default_factory=list)
|
|
|
|
|
|
g_instance_state:InstanceState | None = None
|
|
|
|
|
|
def create_instance_state(
|
|
store_path: Path,
|
|
bootstrap_token: str,
|
|
fastapi_app: FastAPI,
|
|
) -> InstanceState:
|
|
global g_instance_state
|
|
|
|
if g_instance_state:
|
|
logger.warning('create_instance_state() already called')
|
|
else:
|
|
g_instance_state = InstanceState(
|
|
store_path=store_path,
|
|
bootstrap_token=bootstrap_token,
|
|
fastapi_app=fastapi_app,
|
|
)
|
|
return g_instance_state
|
|
|
|
|
|
def get_instance_state() -> InstanceState:
|
|
global g_instance_state
|
|
|
|
if not g_instance_state:
|
|
msg = 'get_instance_state() called before create_instance_state()'
|
|
raise RuntimeError(msg)
|
|
return g_instance_state
|
|
|
|
|
|
def get_record_dir_config(
|
|
path: Path,
|
|
file_name: str = record_dir_config_file_name,
|
|
) -> RecordDirConfigFileContent:
|
|
config_path = path / file_name
|
|
if not config_path.exists():
|
|
msg = f'Config file does not exist: {config_path}'
|
|
raise ConfigError(msg)
|
|
try:
|
|
return RecordDirConfigFileContent(
|
|
**yaml.load(config_path.read_text(), Loader=yaml.SafeLoader)
|
|
)
|
|
except ScannerError as e:
|
|
msg = f'YAML-error while reading config file {config_path}: {e}'
|
|
raise ConfigError(msg) from e
|
|
except ValidationError as e:
|
|
msg = f'Pydantic-error reading config file {config_path}: {e}'
|
|
raise ConfigError(msg) from e
|
|
|
|
|
|
def get_mapping_function_by_name(mapping_function_name: str) -> Callable:
|
|
return mapping_functions[MappingMethod(mapping_function_name)]
|
|
|
|
|
|
def get_mapping_function(collection_config: RecordDirConfigFileContent):
|
|
return mapping_functions[collection_config.idfx]
|