dump-things-server/dump_things_service/collection.py
Christian Monch 20f864155f
Some checks failed
Test execution / Test-all (push) Failing after 1m21s
[temp] debug 1
2026-05-07 15:23:04 +02:00

654 lines
21 KiB
Python

import logging
import shutil
import sys
from pathlib import Path
from typing import Any
from fastapi import (
FastAPI,
HTTPException,
)
from pydantic import (
BaseModel,
TypeAdapter,
ValidationError,
)
from dump_things_service import (
HTTP_400_BAD_REQUEST,
HTTP_403_FORBIDDEN,
HTTP_422_UNPROCESSABLE_CONTENT,
)
from dump_things_service.abstract_config import (
CollectionConfig,
Configuration,
ConfigAuthSpec,
ForgejoAuthSpec,
RecordDirBackendConfig,
SQLiteBackendConfig,
TagSpec,
read_config,
check_collection,
get_backend_and_extension,
get_default_token_name,
get_mapping_function,
)
from dump_things_service.auth.config import ConfigAuthenticationSource
from dump_things_service.auth.forgejo import ForgejoAuthenticationSource
from dump_things_service.backends.record_dir import (
_RecordDirStore,
RecordDirStore,
)
from dump_things_service.backends.sqlite import (
_SQLiteBackend,
SQLiteBackend,
record_file_name as sqlite_db_filename,
)
from dump_things_service.backends.schema_type_layer import SchemaTypeLayer
from dump_things_service.instance_state import (
InstanceState,
get_record_dir_config,
get_instance_state,
get_schema_info,
record_dir_config_file_name, InstanceStateCollectionInfo,
)
from dump_things_service.store.model_store import (
_ModelStore,
ModelStore,
)
from dump_things_service.converter import FormatConverter
from dump_things_service.exceptions import (
ConfigError,
ConfigCollisionError,
CurieResolutionError,
)
from dump_things_service.model import get_model_for_schema
from dump_things_service.utils import (
combine_ttl,
get_token_store,
join_default_token_permissions,
wrap_http_exception, get_required_incoming_labels,
get_required_incoming_info,
)
# This following lines are required for dynamic endpoint generation
from typing import Annotated
from fastapi import Body, Depends
from dump_things_service import Format
from dump_things_service.api_key import api_key_header_scheme
from starlette.responses import JSONResponse, PlainTextResponse
logger = logging.getLogger('dump_things_service')
_endpoint_template = """
def {name}(
data: {model_var_name}.{class_name} | Annotated[str, Body(media_type='text/plain')],
api_key: str = Depends(api_key_header_scheme),
format: Format = Format.json,
) -> JSONResponse | PlainTextResponse:
logger.info('{name}(%s, %s, %s, %s)', repr(data), repr('{class_name}'), repr({model_var_name}), repr(format))
return {handler}('{collection}', data, '{class_name}', {model_var_name}, format, api_key)
"""
_endpoint_curated_template = """
def {name}(
data: {model_var_name}.{class_name},
author_id: str | None = None,
api_key: str = Depends(api_key_header_scheme),
) -> JSONResponse:
logger.info(
'{name}(%s, %s, %s)',
repr(data),
repr(author_id),
repr({model_var_name}),
)
return store_curated_record(
'{collection}',
data,
'{class_name}',
author_id,
api_key,
)
"""
_endpoint_incoming_template = """
def {name}(
data: {model_var_name}.{class_name},
label: str,
api_key: str = Depends(api_key_header_scheme),
) -> JSONResponse:
logger.info(
'{name}(%s, %s, %s)',
repr(data),
repr(label),
repr({model_var_name}),
)
return store_incoming_record(
'{collection}',
label,
data,
'{class_name}',
api_key,
)
"""
def create_collection(
instance_state: InstanceState,
configuration: Configuration,
collection_name: str,
):
"""Create a collection instance as specified by `collection_configuration`
Reuse existing disk structures, if they are compatible. If they are not
compatible, raise an error.
:param instance_state:
:param configuration:
:param collection_name:
:return:
"""
collection_configuration = configuration.collections[collection_name]
curated_path = Path(instance_state.store_path / collection_configuration.curated)
incoming_path = (
None
if collection_configuration.incoming is None
else Path(instance_state.store_path / collection_configuration.incoming)
)
x = """
# Get information for all incoming areas
active_incoming_store_info = get_required_incoming_info(
configuration,
collection_name,
)
if True:
if incoming_path:
if not incoming_path.exists():
incoming_path.mkdir(parents=True)
created_directories.append(incoming_path)
for _, path in active_incoming_store_info:
label_path = incoming_path / path
if not label_path.exists():
label_path.mkdir(parents=True)
created_directories.append(label_path)
# Check all active incoming paths
if incoming_path and incoming_path.exists():
for _, path in active_incoming_store_info:
label_path = incoming_path / path
if label_path.exists():
check_store_compatibility(
incoming_path / label_path,
collection_configuration.backend,
collection_configuration.schema,
)
"""
# Check for compatibility of all existing stores before creating any
# structures on disk.
if curated_path.exists():
check_store_compatibility(
curated_path,
collection_configuration.backend,
collection_configuration.schema,
)
for audit_backend in collection_configuration.audit_backends:
audit_path = Path(instance_state.store_path / audit_backend.path)
if audit_path.exists():
check_audit_compatibility(audit_path)
# We know now that all existing structures are compatible with the
# collection specification. We record what we create in order to delete
# it in case of an error.
created_directories = []
try:
if not curated_path.exists():
curated_path.mkdir(parents=True)
created_directories.append(curated_path)
if incoming_path and not incoming_path.exists():
incoming_path.mkdir(parents=True)
created_directories.append(incoming_path)
for audit_backend in collection_configuration.audit_backends:
audit_path = Path(instance_state.store_path / audit_backend.path)
if not audit_path.exists():
audit_path.mkdir(parents=True)
created_directories.append(audit_path)
except ConfigError as e:
# Delete all directories that were created in this
for directory in created_directories:
shutil.rmtree(directory)
raise
# Create the curated store
curated_store = create_store(
instance_state,
curated_path,
collection_configuration.backend,
collection_configuration.schema,
collection_configuration.submission_tags
)
instance_state.curated_stores[collection_name] = curated_store
# Incoming stores are created on demand when a token is authenticated
instance_state.incoming_stores[collection_name] = {}
# Create the schema modules, schema view, and conversion objects
schema_location = collection_configuration.schema
instance_state.schema_info[schema_location] = get_schema_info(schema_location)
# Determine the active classes based on the classes defined in the schema
# and the configuration of the collection
active_classes = set(instance_state.schema_info[schema_location].classes)
if collection_configuration.use_classes:
active_classes &= set(collection_configuration.use_classes)
if collection_configuration.ignore_classes:
active_classes -= set(collection_configuration.ignore_classes)
instance_state.collections[collection_name] = InstanceStateCollectionInfo(
active_classes=active_classes
)
# Create a validator for the collection
instance_state.validators[collection_name] = FormatConverter(
schema=collection_configuration.schema,
input_format=Format.json,
output_format=Format.ttl,
)
x = """
if incoming_path:
if collection_name not in instance_state.incoming_stores:
instance_state.incoming_stores[collection_name] = {}
# Create a store for each incoming area label in this collection
for token_name, path in active_incoming_store_info:
label_path = incoming_path / path
incoming_store = create_store(
instance_state,
label_path,
collection_configuration.backend,
collection_configuration.schema,
collection_configuration.submission_tags,
)
instance_state.incoming_stores[collection_name][token_name] = incoming_store
# Create the audit log backends
for audit_backend in collection_configuration.audit_backends:
audit_path = Path(instance_state.store_path / audit_backend.path)
if not audit_path.exists():
create_audit_store(audit_path)
created_directories.append(audit_path)
"""
# Create the authentication sources
for authentication_spec in collection_configuration.auth_sources:
create_authentication_source(
configuration,
collection_name,
authentication_spec,
instance_state,
)
# Create the dynamic endpoints for record storing & validation, for
# inbox-storing, and for curated area storing.
create_endpoints_for_collection(
instance_state,
collection_configuration,
instance_state.fastapi_app,
)
def create_store(
instance_state: InstanceState,
relative_path: Path,
backend_config: RecordDirBackendConfig | SQLiteBackendConfig,
schema: str,
submission_tags: TagSpec,
) -> _ModelStore:
backend_type, extension = get_backend_and_extension(backend_config.type)
if isinstance(backend_config, RecordDirBackendConfig):
backend = create_record_dir_backend(instance_state, relative_path, backend_config, schema)
elif isinstance(backend_config, SQLiteBackendConfig):
backend = create_sqlite_backend(instance_state, relative_path)
else:
msg = f'Unsupported backend configuration type: {backend_type} ({type(backend_config)})'
raise ConfigError(msg)
if extension == 'stl':
backend = SchemaTypeLayer(backend=backend, schema=schema)
return ModelStore(
schema=schema,
backend=backend,
tags={
'id': submission_tags.submitter_id_tag,
'time': submission_tags.submission_time_tag,
},
)
def create_record_dir_backend(
instance_state: InstanceState,
relative_path: Path,
backend_config: RecordDirBackendConfig,
schema: str,
) -> _RecordDirStore:
path = instance_state.store_path / relative_path
write_record_dir_config(path, backend_config, schema)
backend = RecordDirStore(
root=path,
pid_mapping_function=get_mapping_function(backend_config),
suffix='yaml',
order_by=instance_state.order_by,
)
backend.build_index_if_needed(schema=schema)
return backend
def create_sqlite_backend(
instance_state: InstanceState,
relative_path: Path,
) -> _SQLiteBackend:
return SQLiteBackend(
db_path=instance_state.store_path / relative_path / sqlite_db_filename,
order_by=instance_state.order_by,
)
def create_authentication_source(
abstract_configuration: Configuration,
collection_name: str,
authentication_spec: ConfigAuthSpec | ForgejoAuthSpec,
instance_state: InstanceState,
):
if collection_name not in instance_state.auth_sources:
instance_state.auth_sources[collection_name] = []
auth_sources = instance_state.auth_sources[collection_name]
if isinstance(authentication_spec, ConfigAuthSpec):
auth_source = ConfigAuthenticationSource(
abstract_configuration=abstract_configuration,
collection_name=collection_name,
)
elif isinstance(authentication_spec, ForgejoAuthSpec):
auth_source = ForgejoAuthenticationSource(
api_url=authentication_spec.url,
organization=authentication_spec.organization,
team=authentication_spec.team,
label_type=authentication_spec.label_type,
instance_id=authentication_spec.repository,
repository=authentication_spec.repository,
)
else:
msg = f"Unsupported authentication config type: '{type(authentication_spec)}'"
raise ConfigError(msg)
auth_sources.append(auth_source)
def write_record_dir_config(
path: Path,
backend_config: RecordDirBackendConfig,
schema: str,
):
assert isinstance(backend_config, RecordDirBackendConfig)
record_dir_config_file_path = path / record_dir_config_file_name
if not record_dir_config_file_path.exists():
record_dir_config_file_path.write_text(f"""# RecordDir Config
type: records
version: 1
schema: {schema}
format: yaml
idfx: {backend_config.mapping_method}
""",
)
def create_audit_store(*args, **kwargs):
return
def check_store_compatibility(
store_path: Path,
backend_config: RecordDirBackendConfig | SQLiteBackendConfig,
schema: str,
):
"""Check if an existing store is compatible with the specs in `backend_config`
:param store_path:
:param backend_config:
:param schema:
:return:
"""
if not store_path.exists():
return
if isinstance(backend_config, RecordDirBackendConfig):
check_record_dir_compatibility(store_path, backend_config, schema)
elif isinstance(backend_config, SQLiteBackendConfig):
check_sqlite_compatibility(store_path, backend_config, schema)
else:
msg = f"Unsupported backend config type: '{type(backend_config)}'"
raise ConfigError(msg)
return
def check_record_dir_compatibility(
store_path: Path,
backend_config: RecordDirBackendConfig,
schema: str,
):
record_dir_config = get_record_dir_config(store_path)
if record_dir_config.schema != schema:
raise ConfigCollisionError(f"Existing backend uses a different schema: '{record_dir_config.schema}'")
stored_mapping_method = record_dir_config.idfx.value
if stored_mapping_method != backend_config.mapping_method:
msg = f"Configuration specifies mapping method '{backend_config.mapping_method}', existing backend uses mapping method: '{stored_mapping_method}'"
raise ConfigCollisionError(msg)
return
def check_sqlite_compatibility(
store_path: Path,
backend_config: SQLiteBackendConfig,
schema: str,
):
sqlite_db_path = Path(store_path / sqlite_db_filename)
if not sqlite_db_path.exists():
raise ConfigError('No sqlite database found in existing store')
return
def check_audit_compatibility(
audit_path: Path,
):
"""Check if an existing audit is compatible with the specs
:param audit_path:
:return:
"""
if not audit_path.exists():
return
print('IMPLEMENT: check_audit_compatibility', file=sys.stderr, flush=True)
def create_endpoint(
operation_name: str,
operation_path: str,
instance_state: InstanceState,
collection_config: CollectionConfig,
template: str,
handler: str,
tag_name: str,
app: FastAPI,
):
logger.info(
f'Creating %s-endpoints for collection: "%s"',
operation_name,
collection_config.name,
)
# TODO: get schema_info from instance_state!?
model, classes, model_var_name = get_model_for_schema(collection_config.schema)
globals()[model_var_name] = model
active_classes = instance_state.collections[collection_config.name].active_classes
for class_name in active_classes:
endpoint_name = f'_endpoint_{collection_config.name}_{operation_name}_{class_name}'
endpoint_source = template.format(
name=endpoint_name,
model_var_name=model_var_name,
class_name=class_name,
collection=collection_config.name,
info=f"'{operation_name} {collection_config.name}/{class_name} objects'",
handler=handler,
)
exec(endpoint_source, globals()) # noqa S102
# Create an API route for the endpoint
app.add_api_route(
path=f'/{collection_config.name}/{operation_path}/{class_name}',
endpoint=globals()[endpoint_name],
methods=['POST'],
name=f'{operation_name} "{class_name}" object (schema: {model.linkml_meta["id"]})',
response_model=None,
tags=[tag_name]
)
logger.info(
'Creation of %d %s-endpoints completed.',
len(active_classes),
operation_name,
)
def create_endpoints_for_collection(
instance_state: InstanceState,
collection_config: CollectionConfig,
app: FastAPI,
):
for (
operation_name,
operation_path,
template,
handler,
tag_name,
) in (
('store', 'record', _endpoint_template, 'store_record', f'Write records to collection "{collection_config.name}"'),
('validate', 'validate', _endpoint_template, 'validate_record', f'Validate records for collection "{collection_config.name}"'),
('curated', 'curated/record', _endpoint_template, 'store_curated_record', f'Store records in curated area of collection "{collection_config.name}"'),
('incoming', 'incoming/{label}/record', _endpoint_template, 'store_incoming_record', f'Store records in incoming area "{{label}}" of collection "{collection_config.name}"'),
):
create_endpoint(
operation_name=operation_name,
operation_path=operation_path,
instance_state=instance_state,
collection_config=collection_config,
template=template,
handler=handler,
tag_name=tag_name,
app=app,
)
def store_record(
collection: str,
data: BaseModel | str,
class_name: str,
model: Any,
input_format: Format,
api_key: str | None = Depends(api_key_header_scheme),
) -> JSONResponse | PlainTextResponse:
if input_format == Format.json and isinstance(data, str):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST, detail='Invalid JSON data provided.'
)
if input_format == Format.ttl and not isinstance(data, str):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST, detail='Invalid ttl data provided.'
)
instance_state = get_instance_state()
abstract_config = read_config(instance_state.store_path)
check_collection(abstract_config, collection)
token = (
get_default_token_name(abstract_config, collection)
if api_key is None
else api_key
)
# Get the token permissions and extend them by the default permissions.
# This call will also convert plaintext tokens into the hashed version of
# the token, if the token is hashed. This is necessary because we do not
# store the plaintext token, so all token-information is associated with
# the hashed representation of the token.
store, token_permissions, user_id = get_token_store(
abstract_config,
instance_state,
collection,
token,
)
final_permissions = join_default_token_permissions(
abstract_config,
instance_state,
token_permissions,
collection,
)
if not final_permissions.incoming_write:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,
detail=f"Not authorized to submit to collection '{collection}'.",
)
if input_format == Format.ttl:
with wrap_http_exception(ValueError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Conversion error'):
json_object = FormatConverter(
instance_state.schemas[collection],
input_format=Format.ttl,
output_format=Format.json,
).convert(data, class_name)
with wrap_http_exception(ValidationError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Validation error'):
record = TypeAdapter(getattr(model, class_name)).validate_python(json_object)
else:
record = data
with wrap_http_exception(ValueError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Validation error'):
instance_state.validators[collection].validate(record)
with wrap_http_exception(CurieResolutionError):
stored_records = store.store_object(obj=record, submitter=user_id)
if input_format == Format.ttl:
format_converter = FormatConverter(
instance_state.schemas[collection],
input_format=Format.json,
output_format=Format.ttl,
)
with wrap_http_exception(ValueError, header='Conversion error'):
return PlainTextResponse(
combine_ttl(
[
format_converter.convert(
record,
class_name,
)
for class_name, record in stored_records
]
),
media_type='text/turtle',
)
return JSONResponse([record for _, record in stored_records])