dump-things-server/dump_things_service/curated.py
Christian Monch 46e3cb2b9a
All checks were successful
Test execution / Test-all (push) Successful in 2m9s
[temp] cleanup round 1
2026-05-13 16:42:57 +02:00

331 lines
9.2 KiB
Python

from __future__ import annotations
import logging
from itertools import count
from typing import TYPE_CHECKING
from fastapi import (
APIRouter,
Depends,
FastAPI,
HTTPException,
)
from fastapi_pagination import (
Page,
add_pagination,
paginate,
)
from dump_things_service import (
HTTP_401_UNAUTHORIZED,
HTTP_404_NOT_FOUND,
HTTP_422_UNPROCESSABLE_CONTENT, abstract_config,
)
from dump_things_service.abstract_config import check_collection, read_config, \
get_config, get_token_config_for_representation_and_collection
from dump_things_service.api_key import api_key_header_scheme
from dump_things_service.backends.schema_type_layer import _SchemaTypeLayer
from dump_things_service.exceptions import CurieResolutionError
from dump_things_service.instance_state import get_instance_state
from dump_things_service.lazy_list import ModifierList
from dump_things_service.utils import (
authenticate_token,
check_bounds,
cleaned_json,
wrap_http_exception,
)
if TYPE_CHECKING:
from pydantic import BaseModel
from dump_things_service.backends import StorageBackend
from dump_things_service.lazy_list import LazyList
from dump_things_service.store.model_store import _ModelStore
_endpoint_curated_template = """
async def {name}(
data: {model_var_name}.{class_name},
author_id: str | None = None,
api_key: str = Depends(api_key_header_scheme),
) -> JSONResponse:
logger.info(
'{name}(%s, %s, %s)',
repr(data),
repr(author_id),
repr({model_var_name}),
)
return await store_curated_record(
'{collection}',
data,
'{class_name}',
author_id,
api_key,
)
"""
logger = logging.getLogger('dump_things_service')
router = APIRouter()
add_pagination(router)
@router.get(
'/{collection}/curated/records/{class_name}',
tags=['Curated area: read records'],
name='Read all records of the given class from the curated area'
)
async def read_curated_records_of_type(
collection: str,
class_name: str,
matching: str | None = None,
api_key: str | None = Depends(api_key_header_scheme),
):
instance_state = get_instance_state()
if class_name not in instance_state.collections[collection].active_classes:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"No '{class_name}'-class in collection '{collection}'.",
)
return await _read_curated_records(
collection=collection,
class_name=class_name,
pid=None,
matching=matching,
api_key=api_key,
upper_bound=500,
)
@router.get(
'/{collection}/curated/records/p/{class_name}',
tags=['Curated area: read records'],
name='Read all records of the given class from the curated area with pagination'
)
async def read_curated_records_of_type_paginated(
collection: str,
class_name: str,
matching: str | None = None,
api_key: str | None = Depends(api_key_header_scheme),
) -> Page[dict]:
instance_state = get_instance_state()
if class_name not in instance_state.collections[collection].active_classes:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"No '{class_name}'-class in collection '{collection}'.",
)
record_list = await _read_curated_records(
collection=collection,
class_name=class_name,
pid=None,
matching=matching,
api_key=api_key,
)
return paginate(record_list)
@router.get(
'/{collection}/curated/records/',
tags=['Curated area: read records'],
name='Read all records from the curated area'
)
async def read_curated_all_records(
collection: str,
matching: str | None = None,
api_key: str | None = Depends(api_key_header_scheme),
):
return await _read_curated_records(
collection=collection,
class_name=None,
pid=None,
matching=matching,
api_key=api_key,
upper_bound=500,
)
@router.get(
'/{collection}/curated/records/p/',
tags=['Curated area: read records'],
name='Read all records from the curated area with pagination'
)
async def read_curated_all_records_paginated(
collection: str,
matching: str | None = None,
api_key: str | None = Depends(api_key_header_scheme),
) -> Page[dict]:
record_list = await _read_curated_records(
collection=collection,
class_name=None,
pid=None,
matching=matching,
api_key=api_key,
upper_bound=None,
)
return paginate(record_list)
@router.get(
'/{collection}/curated/record',
tags=['Curated area: read records'],
name='Read the record with the given pid from the curated area'
)
async def read_curated_record_with_pid(
collection: str,
pid: str,
api_key: str = Depends(api_key_header_scheme),
):
return await _read_curated_records(
collection=collection,
class_name=None,
pid=pid,
api_key=api_key,
)
@router.delete(
'/{collection}/curated/record',
tags=['Curated area: delete records'],
name='Delete the record with the given pid from the curated area of the given collection'
)
async def delete_curated_record_with_pid(
collection: str,
pid: str,
api_key: str = Depends(api_key_header_scheme),
):
return await _delete_curated_record(
collection=collection,
pid=pid,
api_key=api_key,
)
async def _read_curated_records(
collection: str,
class_name: str | None,
pid: str | None,
matching: str | None = None,
api_key: str | None = None,
upper_bound: int | None = 1000,
) -> LazyList | dict | None:
model_store, backend = _get_store_and_backend(collection, api_key)
if pid:
record_info = backend.get_record_by_iri(model_store.pid_to_iri(pid))
if record_info:
return record_info.json_object
return None
if class_name:
result_list = backend.get_records_of_classes([class_name], matching)
else:
result_list = backend.get_all_records(matching)
if upper_bound is not None:
check_bounds(
len(result_list),
upper_bound,
collection,
f'/curated/records/p/{class_name}'
if class_name
else '/curated/records/p/',
)
return ModifierList(
result_list,
lambda record_info: record_info.json_object,
)
async def _delete_curated_record(
collection: str,
pid: str | None,
api_key: str | None = None,
) -> bool:
with wrap_http_exception(Exception):
model_store, backend = _get_store_and_backend(collection, api_key)
result = backend.remove_record(model_store.pid_to_iri(pid))
if not result:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"Could not remove record with PID '{pid}' from curated area "
f"of collection '{collection}'.",
)
return True
def _get_store_and_backend(
collection: str,
plain_token: str | None,
) -> tuple[_ModelStore, StorageBackend]:
# A token is required
if plain_token is None:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail='token required',
)
instance_state = get_instance_state()
abstract_config = read_config(instance_state.store_path)
# Check that the collection exists
check_collection(abstract_config=abstract_config, collection=collection)
# Get token permissions
auth_info = authenticate_token(instance_state, collection, plain_token)
permissions = auth_info.token_permission
if permissions.curated_write is False:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail=f'no write access to curated area of collection `{collection}`',
)
# Get the curated model store
model_store = instance_state.curated_stores[collection]
backend = model_store.backend
if isinstance(backend, _SchemaTypeLayer):
return model_store, backend.backend
return model_store, backend
def store_curated_record(
collection: str,
data: BaseModel,
class_name: str,
author_id: str | None = None,
api_key: str | None = Depends(api_key_header_scheme),
):
instance_state = get_instance_state()
with wrap_http_exception(ValueError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Validation error'):
instance_state.validators[collection].validate(data)
pid = data.pid
model_store, backend = _get_store_and_backend(collection, api_key)
json_object = cleaned_json(
data.model_dump(exclude_none=True, mode='json'),
remove_keys=('@type',),
)
with wrap_http_exception(CurieResolutionError):
backend.add_record(
model_store.pid_to_iri(pid),
class_name,
json_object,
)
_, token_config, _ = get_token_config_for_representation_and_collection(
abstract_config=get_config(),
token_representation=api_key,
collection_name=collection,
)
for audit_backend in instance_state.audit_backends[collection]:
audit_backend.add_record(
record=json_object,
committer_id=token_config.user_id,
author_id=author_id,
)