dump-things-server/dump_things_service/instance_state.py
Christian Monch b4ca7f555b refactor imports
2026-06-12 12:41:54 +02:00

155 lines
4.5 KiB
Python

from __future__ import annotations
import dataclasses
import logging
from functools import cache
from pathlib import Path
from types import ModuleType
from typing import (
Any,
Callable,
)
import yaml
from fastapi import FastAPI
from linkml_runtime import SchemaView
from pydantic import ValidationError
from yaml.scanner import ScannerError
from dump_things_service.abstract_config import (
RecordDirConfigFileContent,
MappingMethod,
mapping_functions,
)
from dump_things_service.converter import get_conversion_objects
from dump_things_service.exceptions import ConfigError
from dump_things_service.model import (
get_model_for_schema,
get_schema_model_for_schema,
get_schema_view,
)
logger = logging.getLogger('dump_things_service')
record_dir_config_file_name = '.dumpthings.yaml'
ignored_files = {'.', '..', record_dir_config_file_name}
@dataclasses.dataclass
class PydanticModuleInfo:
module: ModuleType
module_var_name: str
@dataclasses.dataclass
class SchemaInfo:
schema_view: SchemaView
classes: list[str]
pydantic_module_info: PydanticModuleInfo
python_module: ModuleType
conversion_objects: tuple[Any, Any]
@dataclasses.dataclass
class InstanceStateCollectionInfo:
active_classes: set[str]
tag_info: dict[str, str]
@cache
def get_schema_info(schema_location: str):
module, classes, module_var_name = get_model_for_schema(schema_location)
return SchemaInfo(
schema_view=get_schema_view(schema_location),
classes=classes,
pydantic_module_info=PydanticModuleInfo(
module=module,
module_var_name=module_var_name,
),
python_module=get_schema_model_for_schema(schema_location),
conversion_objects=get_conversion_objects(schema_location),
)
@dataclasses.dataclass
class InstanceState:
# foundational information from command line or initialization code
store_path: Path
bootstrap_token: str | None
# Dynamically created elements
fastapi_app: FastAPI
# Influenced by maintainer interface
maintenance_mode: set = dataclasses.field(default_factory=set)
# Created based on abstract configuration
collections: dict[str, InstanceStateCollectionInfo] = dataclasses.field(default_factory=dict)
tokens: dict = dataclasses.field(default_factory=dict)
auth_sources: dict[str, list] = dataclasses.field(default_factory=dict)
audit_backends: dict[str, list] = dataclasses.field(default_factory=dict)
curated_stores: dict = dataclasses.field(default_factory=dict)
incoming_stores: dict = dataclasses.field(default_factory=dict)
schema_info: dict[str, SchemaInfo] = dataclasses.field(default_factory=dict)
validators: dict = dataclasses.field(default_factory=dict)
order_by: list[str] = dataclasses.field(default_factory=list)
g_instance_state:InstanceState | None = None
def create_instance_state(
store_path: Path,
bootstrap_token: str,
fastapi_app: FastAPI,
) -> InstanceState:
global g_instance_state
if g_instance_state:
logger.warning('create_instance_state() already called')
else:
g_instance_state = InstanceState(
store_path=store_path,
bootstrap_token=bootstrap_token,
fastapi_app=fastapi_app,
)
return g_instance_state
def get_instance_state() -> InstanceState:
global g_instance_state
if not g_instance_state:
msg = 'get_instance_state() called before create_instance_state()'
raise RuntimeError(msg)
return g_instance_state
def get_record_dir_config(
path: Path,
file_name: str = record_dir_config_file_name,
) -> RecordDirConfigFileContent:
config_path = path / file_name
if not config_path.exists():
msg = f'Config file does not exist: {config_path}'
raise ConfigError(msg)
try:
return RecordDirConfigFileContent(
**yaml.load(config_path.read_text(), Loader=yaml.SafeLoader)
)
except ScannerError as e:
msg = f'YAML-error while reading config file {config_path}: {e}'
raise ConfigError(msg) from e
except ValidationError as e:
msg = f'Pydantic-error reading config file {config_path}: {e}'
raise ConfigError(msg) from e
def get_mapping_function_by_name(mapping_function_name: str) -> Callable:
return mapping_functions[MappingMethod(mapping_function_name)]
def get_mapping_function(collection_config: RecordDirConfigFileContent):
return mapping_functions[collection_config.idfx]