392 lines
11 KiB
Python
392 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import TYPE_CHECKING
|
|
|
|
from fastapi import (
|
|
APIRouter,
|
|
Depends,
|
|
HTTPException,
|
|
)
|
|
from fastapi_pagination import (
|
|
Page,
|
|
add_pagination,
|
|
paginate,
|
|
)
|
|
|
|
from dump_things_service import (
|
|
HTTP_401_UNAUTHORIZED,
|
|
HTTP_404_NOT_FOUND,
|
|
HTTP_422_UNPROCESSABLE_CONTENT,
|
|
)
|
|
from dump_things_service.abstract_config import (
|
|
check_collection,
|
|
check_label,
|
|
get_config_labels,
|
|
get_config,
|
|
)
|
|
from dump_things_service.api_key import api_key_header_scheme
|
|
from dump_things_service.backends.schema_type_layer import _SchemaTypeLayer
|
|
from dump_things_service.exceptions import CurieResolutionError
|
|
from dump_things_service.instance_state import get_instance_state
|
|
from dump_things_service.lazy_list import ModifierList
|
|
from dump_things_service.utils import (
|
|
authenticate_token,
|
|
check_bounds,
|
|
cleaned_json,
|
|
create_token_store,
|
|
get_on_disk_labels,
|
|
wrap_http_exception,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from pydantic import BaseModel
|
|
|
|
from dump_things_service.backends import StorageBackend
|
|
from dump_things_service.lazy_list import LazyList
|
|
from dump_things_service.store.model_store import _ModelStore
|
|
|
|
|
|
logger = logging.getLogger('dump_things_service')
|
|
router = APIRouter()
|
|
add_pagination(router)
|
|
|
|
|
|
@router.get(
|
|
'/{collection}/incoming/',
|
|
tags=['Incoming area: read labels'],
|
|
name='Get all incoming labels for the given collection'
|
|
)
|
|
async def incoming_read_labels(
|
|
collection: str,
|
|
api_key: str | None = Depends(api_key_header_scheme),
|
|
) -> list[str]:
|
|
# Authorize api_key
|
|
await authorize_zones(collection, api_key)
|
|
|
|
instance_state = get_instance_state()
|
|
configured_labels = get_config_labels(get_config(), collection)
|
|
on_disk_labels = get_on_disk_labels(instance_state.store_path, get_config(), collection)
|
|
return list(configured_labels.union(on_disk_labels))
|
|
|
|
|
|
@router.get(
|
|
'/{collection}/incoming/{label}/records/{class_name}',
|
|
tags=['Incoming area: read records'],
|
|
name='Read all records of the given class from the given incoming area'
|
|
)
|
|
async def incoming_read_records_of_type(
|
|
collection: str,
|
|
label: str,
|
|
class_name: str,
|
|
matching: str | None = None,
|
|
api_key: str | None = Depends(api_key_header_scheme),
|
|
):
|
|
instance_state = get_instance_state()
|
|
if class_name not in instance_state.collections[collection].active_classes:
|
|
raise HTTPException(
|
|
status_code=HTTP_404_NOT_FOUND,
|
|
detail=f"No '{class_name}'-class in collection '{collection}'.",
|
|
)
|
|
|
|
return await _incoming_read_records(
|
|
collection=collection,
|
|
label=label,
|
|
class_name=class_name,
|
|
pid=None,
|
|
matching=matching,
|
|
api_key=api_key,
|
|
upper_bound=500,
|
|
)
|
|
|
|
|
|
@router.get(
|
|
'/{collection}/incoming/{label}/records/p/{class_name}',
|
|
tags=['Incoming area: read records'],
|
|
name='Read all records of the given class from the given incoming area with pagination'
|
|
)
|
|
async def incoming_read_records_of_type_paginated(
|
|
collection: str,
|
|
label: str,
|
|
class_name: str,
|
|
matching: str | None = None,
|
|
api_key: str | None = Depends(api_key_header_scheme),
|
|
) -> Page[dict]:
|
|
|
|
instance_state = get_instance_state()
|
|
if class_name not in instance_state.collections[collection].active_classes:
|
|
raise HTTPException(
|
|
status_code=HTTP_404_NOT_FOUND,
|
|
detail=f"No '{class_name}'-class in collection '{collection}'.",
|
|
)
|
|
|
|
record_list = await _incoming_read_records(
|
|
collection=collection,
|
|
label=label,
|
|
class_name=class_name,
|
|
pid=None,
|
|
matching=matching,
|
|
api_key=api_key,
|
|
)
|
|
return paginate(record_list)
|
|
|
|
|
|
@router.get(
|
|
'/{collection}/incoming/{label}/records/',
|
|
tags=['Incoming area: read records'],
|
|
name='Read all records from the given incoming area'
|
|
)
|
|
async def incoming_read_all_records(
|
|
collection: str,
|
|
label: str,
|
|
matching: str | None = None,
|
|
api_key: str | None = Depends(api_key_header_scheme),
|
|
):
|
|
return await _incoming_read_records(
|
|
collection=collection,
|
|
label=label,
|
|
class_name=None,
|
|
pid=None,
|
|
matching=matching,
|
|
api_key=api_key,
|
|
upper_bound=500,
|
|
)
|
|
|
|
|
|
@router.get(
|
|
'/{collection}/incoming/{label}/records/p/',
|
|
tags=['Incoming area: read records'],
|
|
name='Read all records from the given incoming area with pagination'
|
|
)
|
|
async def incoming_read_all_records_paginated(
|
|
collection: str,
|
|
label: str,
|
|
matching: str | None = None,
|
|
api_key: str | None = Depends(api_key_header_scheme),
|
|
) -> Page[dict]:
|
|
record_list = await _incoming_read_records(
|
|
collection=collection,
|
|
label=label,
|
|
class_name=None,
|
|
pid=None,
|
|
matching=matching,
|
|
api_key=api_key,
|
|
upper_bound=None,
|
|
)
|
|
return paginate(record_list)
|
|
|
|
|
|
@router.get(
|
|
'/{collection}/incoming/{label}/record',
|
|
tags=['Incoming area: read records'],
|
|
name='Read the record with the given PID from the given incoming area'
|
|
)
|
|
async def incoming_read_record_with_pid(
|
|
collection: str,
|
|
label: str,
|
|
pid: str,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
):
|
|
return await _incoming_read_records(
|
|
collection=collection,
|
|
label=label,
|
|
class_name=None,
|
|
pid=pid,
|
|
api_key=api_key,
|
|
)
|
|
|
|
|
|
@router.delete(
|
|
'/{collection}/incoming/{label}/record',
|
|
tags=['Incoming area: delete records'],
|
|
name='Delete the record with the given PID from the given incoming area'
|
|
)
|
|
async def incoming_delete_record_with_pid(
|
|
collection: str,
|
|
label: str,
|
|
pid: str,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
):
|
|
return await _incoming_delete_record(
|
|
collection=collection,
|
|
label=label,
|
|
pid=pid,
|
|
api_key=api_key,
|
|
)
|
|
|
|
|
|
async def _incoming_read_records(
|
|
collection: str,
|
|
label: str,
|
|
class_name: str | None,
|
|
pid: str | None,
|
|
matching: str | None = None,
|
|
api_key: str | None = None,
|
|
upper_bound: int = 1000,
|
|
) -> LazyList | dict | None:
|
|
|
|
model_store, backend = await _get_store_and_backend(collection, label, api_key)
|
|
|
|
if pid:
|
|
record_info = backend.get_record_by_iri(model_store.pid_to_iri(pid))
|
|
if record_info:
|
|
return record_info.json_object
|
|
return None
|
|
if class_name:
|
|
result_list = backend.get_records_of_classes([class_name], matching)
|
|
else:
|
|
result_list = backend.get_all_records(matching)
|
|
|
|
if upper_bound is not None:
|
|
check_bounds(
|
|
len(result_list),
|
|
upper_bound,
|
|
collection,
|
|
f'/incoming/{label}/records/p/{class_name}'
|
|
if class_name
|
|
else f'/incoming/{label}/records/p/'
|
|
)
|
|
|
|
return ModifierList(
|
|
result_list,
|
|
lambda record_info: record_info.json_object,
|
|
)
|
|
|
|
|
|
async def _incoming_delete_record(
|
|
collection: str,
|
|
label: str,
|
|
pid: str | None,
|
|
api_key: str | None = None,
|
|
) -> bool:
|
|
model_store, backend = await _get_store_and_backend(collection, label, api_key)
|
|
with wrap_http_exception(Exception):
|
|
result = backend.remove_record(model_store.pid_to_iri(pid))
|
|
if not result:
|
|
raise HTTPException(
|
|
status_code=HTTP_404_NOT_FOUND,
|
|
detail=f"Could not remove record with PID '{pid}' from incoming "
|
|
f"area '{label}' of collection '{collection}'.",
|
|
)
|
|
return True
|
|
|
|
|
|
async def _get_store_and_backend(
|
|
collection: str,
|
|
label: str,
|
|
plain_token: str | None,
|
|
) -> tuple[_ModelStore, StorageBackend]:
|
|
|
|
# Authorize api_key
|
|
await authorize_zones(collection, plain_token)
|
|
|
|
# Check that the incoming zone exists
|
|
instance_state = get_instance_state()
|
|
abstract_config = get_config()
|
|
check_label(instance_state.store_path, abstract_config, collection, label)
|
|
|
|
# Create a store (or get an already created store) for collection
|
|
# `collection` and storage dir `store_dir`.
|
|
store_dir = (
|
|
instance_state.store_path
|
|
/ abstract_config.collections[collection].incoming
|
|
/ label
|
|
)
|
|
# `create_token_store` will cache and return already created stores with
|
|
# the same collection and storage dir.
|
|
model_store = create_token_store(
|
|
abstract_configuration=abstract_config,
|
|
instance_state=instance_state,
|
|
collection_name=collection,
|
|
store_dir=store_dir,
|
|
)
|
|
|
|
xxx = """
|
|
# For consistency, associate the store with all matching tokens from the
|
|
# configuration file. That means with all tokens that have the same
|
|
# input
|
|
matching_tokens = [
|
|
token_name
|
|
for token_name, token_info in abstract_config.tokens.items()
|
|
if (collection, label) in [
|
|
(collection_name, token_collection_info.incoming_label)
|
|
for collection_name, token_collection_info in token_info.items()
|
|
]
|
|
]
|
|
|
|
for matching_token in matching_tokens:
|
|
# Associate the store with all matching tokens in the configuration.
|
|
# Note: there are stores that are not associated with a token in
|
|
# the abstract configuration. These are stores that belong to a token
|
|
# that is authenticated with an external authentication source.
|
|
token_info = instance_state.tokens[collection][matching_token]
|
|
instance_state.token_stores[collection][matching_token] = (
|
|
model_store,
|
|
matching_token,
|
|
token_info['permissions'],
|
|
token_info['user_id'],
|
|
)
|
|
"""
|
|
|
|
backend = model_store.backend
|
|
if isinstance(backend, _SchemaTypeLayer):
|
|
return model_store, backend.backend
|
|
return model_store, backend
|
|
|
|
|
|
async def authorize_zones(
|
|
collection: str,
|
|
plain_token: str | None,
|
|
):
|
|
# A token is required
|
|
if plain_token is None:
|
|
raise HTTPException(
|
|
status_code=HTTP_401_UNAUTHORIZED,
|
|
detail='token required',
|
|
)
|
|
|
|
abstract_config = get_config()
|
|
instance_state = get_instance_state()
|
|
|
|
# Check that the collection exists
|
|
check_collection(abstract_config, collection)
|
|
|
|
auth_info = authenticate_token(instance_state, collection, plain_token)
|
|
permissions = auth_info.token_permission
|
|
if permissions.zones_access is False:
|
|
raise HTTPException(
|
|
status_code=HTTP_401_UNAUTHORIZED,
|
|
detail=f'no access to incoming zones of collection `{collection}`',
|
|
)
|
|
|
|
|
|
async def store_incoming_record(
|
|
collection: str,
|
|
label: str,
|
|
data: BaseModel,
|
|
class_name: str,
|
|
api_key: str | None = Depends(api_key_header_scheme),
|
|
):
|
|
|
|
instance_state = get_instance_state()
|
|
with wrap_http_exception(ValueError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Validation error'):
|
|
instance_state.validators[collection].validate(data)
|
|
|
|
pid = data.pid
|
|
model_store, backend = await _get_store_and_backend(
|
|
collection,
|
|
label,
|
|
api_key,
|
|
)
|
|
|
|
json_object = cleaned_json(
|
|
data.model_dump(exclude_none=True, mode='json'),
|
|
remove_keys=('@type',),
|
|
)
|
|
|
|
with wrap_http_exception(CurieResolutionError):
|
|
backend.add_record(
|
|
model_store.pid_to_iri(pid),
|
|
class_name,
|
|
json_object,
|
|
)
|