dump-things-server/dump_things_service/main.py
Christian Monch 0225d8647b
Some checks failed
Test execution / Test-all (push) Failing after 1m34s
[temp] adjust tests
2026-05-06 17:22:44 +02:00

831 lines
25 KiB
Python

from __future__ import annotations # noqa: I001 -- the patches have to be imported early
import argparse
import logging
from pathlib import Path
from typing import (
Annotated, # noqa F401 -- used by generated code
Any,
TYPE_CHECKING,
)
from dump_things_service.manifest import manifest_configuration
# Perform the patching before importing any third-party libraries
from dump_things_service.patches import enabled # noqa: F401
import uvicorn
from fastapi import (
Body, # noqa F401 -- used by generated code
Depends,
FastAPI,
HTTPException,
Response, # noqa F401 -- used by generated code
)
from fastapi.middleware.cors import CORSMiddleware
from fastapi_pagination import (
Page,
add_pagination,
paginate,
)
from fastapi_pagination.utils import disable_installed_extensions_check
from pydantic import (
BaseModel,
TypeAdapter,
ValidationError,
)
from starlette.responses import (
JSONResponse,
PlainTextResponse,
RedirectResponse,
)
from dump_things_service import (
HTTP_400_BAD_REQUEST,
HTTP_403_FORBIDDEN,
HTTP_404_NOT_FOUND,
HTTP_422_UNPROCESSABLE_CONTENT,
Format,
)
from dump_things_service.__about__ import __version__
from dump_things_service.abstract_config import (
check_collection,
get_default_token_name,
read_config,
)
from dump_things_service.api_key import api_key_header_scheme
from dump_things_service.converter import (
FormatConverter,
ConvertingList,
)
from dump_things_service.exceptions import CurieResolutionError
from dump_things_service.instance_state import (
create_instance_state,
)
from dump_things_service.lazy_list import (
PriorityList,
ModifierList,
)
from dump_things_service.model import (
get_subclasses,
)
from dump_things_service.collection_endpoints import (
router as collection_router,
)
from dump_things_service.token_endpoints import (
router as token_router,
)
from dump_things_service.utils import (
authenticate_token,
check_bounds,
combine_ttl,
get_token_store,
join_default_token_permissions,
process_token,
wrap_http_exception,
)
if TYPE_CHECKING:
from dump_things_service.lazy_list import LazyList
class MaintenanceRequest(BaseModel):
collection: str
active: bool
class ServerCollectionResponse(BaseModel):
name: str
schema: str
classes: list[str]
class ServerCollectionCountedResponse(ServerCollectionResponse):
records: int
class ServerResponse(BaseModel):
version: str
collections: list[ServerCollectionResponse|ServerCollectionCountedResponse]
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger('dump_things_service')
parser = argparse.ArgumentParser()
parser.add_argument('--host', default='0.0.0.0') # noqa S104
parser.add_argument('--port', default=8000, type=int)
parser.add_argument('--origins', action='append', default=[])
#parser.add_argument(
# '-c',
# '--config',
# metavar='CONFIG_FILE',
# help="Read the configuration from 'CONFIG_FILE' instead of looking for it in the data store root directory. ",
#)
parser.add_argument(
'--root-path',
default='',
help="Set the ASGI 'root_path' for applications submounted below a given URL path.",
)
parser.add_argument(
'--log-level',
default='WARNING',
help="Set the log level for the service, allowed values are 'ERROR', 'WARNING', 'INFO', 'DEBUG'. Default is 'warning'.",
)
parser.add_argument(
'store',
help='The root of the data stores, it should contain a global_store and token_stores.',
)
description = """
A service to store and retrieve data that is structured according to given
schemata.
Data is stored in **collections**.
Each collection has a name and an associated schema.
All data records in the collection have to adhere to the given schema.
Users store data in an incoming area and read data from a curated area and their
incoming area. There can be many incoming areas, but only one curated area.
Curators store data in an incoming area or in the curated area and read data
from any incoming area or the curated area.
For more information refer to the [README-file](https://github.com/christian-monch/dump-things-server?tab=readme-ov-file#dump-things-service)
of the project.
"""
tag_info = [
{
'name': 'Server management',
'description': 'General server operations',
},
{
'name': 'Read records',
'description': 'Read records from the given collection',
},
{
'name': 'placeholder_write',
'description': '',
},
{
'name': 'placeholder_validate',
'description': '',
},
{
'name': 'Delete records',
'description': 'Delete records from the incoming area associated with the authorization token',
},
{
'name': 'Curated area: read records',
'description': 'Read records only from the curated area of the given collection (requires **curator token**)',
},
{
'name': 'placeholder_curated_write',
'description': '',
},
{
'name': 'Curated area: delete records',
'description': 'Delete records from the curated area of the given collection (requires **curator token**)',
},
{
'name': 'Incoming area: read labels',
'description': 'Read labels of all incoming areas for the given collection (requires **curator token**)',
},
{
'name': 'Incoming area: read records',
'description': 'Read records from the given incoming area of the given collection (requires **curator token**)',
},
{
'name': 'placeholder_incoming_write',
'description': '',
},
{
'name': 'Incoming area: delete records',
'description': 'Delete records from the given incoming area of the given collection (requires **curator token**)',
},
]
arguments = parser.parse_args()
# Set the log level
numeric_level = getattr(logging, arguments.log_level.upper(), None)
if not isinstance(numeric_level, int):
logger.error(
'Invalid log level: %s, defaulting to level "WARNING"', arguments.log_level
)
else:
logger.setLevel(level=numeric_level)
store_path = Path(arguments.store).resolve()
if not store_path.exists():
logger.error(f'Store path does not exist: {store_path}')
raise SystemExit(1)
disable_installed_extensions_check()
app = FastAPI(
title='Dump Things Service',
description=description,
version=__version__,
openapi_tags=tag_info
)
#app.include_router(collection_router)
#app.include_router(curated_router)
#app.include_router(incoming_router)
app.include_router(token_router)
app.include_router(collection_router)
# Add CORS origins
app.add_middleware(
CORSMiddleware,
allow_origins=arguments.origins,
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
# Add pagination
add_pagination(app)
#new_process_config(
# store_path=store_path,
# fastapi_app=app,
# order_by=['pid'],
# globals_dict=globals(),
#)
#g_instance_config = get_config()
g_instance_state = create_instance_state(
store_path=store_path,
bootstrap_token='admin-1',
fastapi_app=app,
)
g_configuration = read_config(store_path)
manifest_configuration(
configuration=g_configuration,
instance_state=g_instance_state,
)
x = """
create_store_endpoints(
app=app,
configuration=g_configuration,
tag_info=tag_info,
placeholder='placeholder_write',
global_dict=globals(),
)
create_validate_endpoints(
app=app,
configuration=g_configuration,
tag_info=tag_info,
placeholder='placeholder_validate',
global_dict=globals(),
)
"""
g_instance_state.fastapi_app.openapi_schema = None
g_instance_state.fastapi_app.setup()
def xxx_store_record(
collection: str,
data: BaseModel | str,
class_name: str,
model: Any,
input_format: Format,
api_key: str | None = Depends(api_key_header_scheme),
) -> JSONResponse | PlainTextResponse:
if input_format == Format.json and isinstance(data, str):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST, detail='Invalid JSON data provided.'
)
if input_format == Format.ttl and not isinstance(data, str):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST, detail='Invalid ttl data provided.'
)
check_collection(g_instance_state, collection)
token = (
get_default_token_name(g_instance_state, collection)
if api_key is None
else api_key
)
# Get the token permissions and extend them by the default permissions.
# This call will also convert plaintext tokens into the hashed version of
# the token, if the token is hashed. This is necessary because we do not
# store the plaintext token, so all token-information is associated with
# the hashed representation of the token.
store, _, token_permissions, user_id = get_token_store(
g_instance_state,
collection,
token,
)
final_permissions = join_default_token_permissions(
g_configuration,
g_instance_config,
token_permissions,
collection,
)
if not final_permissions.incoming_write:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,
detail=f"Not authorized to submit to collection '{collection}'.",
)
if input_format == Format.ttl:
with wrap_http_exception(ValueError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Conversion error'):
json_object = FormatConverter(
g_instance_config.schemas[collection],
input_format=Format.ttl,
output_format=Format.json,
).convert(data, class_name)
with wrap_http_exception(ValidationError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Validation error'):
record = TypeAdapter(getattr(model, class_name)).validate_python(json_object)
else:
record = data
with wrap_http_exception(ValueError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Validation error'):
g_instance_config.validators[collection].validate(record)
with wrap_http_exception(CurieResolutionError):
stored_records = store.store_object(obj=record, submitter=user_id)
if input_format == Format.ttl:
format_converter = FormatConverter(
g_instance_config.schemas[collection],
input_format=Format.json,
output_format=Format.ttl,
)
with wrap_http_exception(ValueError, header='Conversion error'):
return PlainTextResponse(
combine_ttl(
[
format_converter.convert(
record,
class_name,
)
for class_name, record in stored_records
]
),
media_type='text/turtle',
)
return JSONResponse([record for _, record in stored_records])
def validate_record(
collection: str,
data: BaseModel | str,
class_name: str,
model: Any,
input_format: Format,
api_key: str | None = Depends(api_key_header_scheme),
) -> JSONResponse:
if input_format == Format.json and isinstance(data, str):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST, detail='Invalid JSON data provided.'
)
if input_format == Format.ttl and not isinstance(data, str):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST, detail='Invalid ttl data provided.'
)
check_collection(g_configuration, collection)
token = (
get_default_token_name(g_configuration, collection)
if api_key is None
else api_key
)
store, _, token_permissions, user_id = get_token_store(
g_configuration,
g_instance_state,
collection,
token,
)
final_permissions = join_default_token_permissions(
g_configuration,
g_instance_state,
token_permissions,
collection,
)
if not final_permissions.incoming_write:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,
detail=f"Not authorized to validate records for collection '{collection}'.",
)
if input_format == Format.ttl:
with wrap_http_exception(ValueError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Conversion error'):
json_object = FormatConverter(
g_instance_config.schemas[collection],
input_format=Format.ttl,
output_format=Format.json,
).convert(data, class_name)
with wrap_http_exception(ValidationError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Validation error'):
TypeAdapter(getattr(model, class_name)).validate_python(json_object)
else:
# Try to convert it into TTL to detect potential errors before storing
# the record
with wrap_http_exception(ValueError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Validation error'):
g_instance_config.validators[collection].validate(data)
return JSONResponse(True)
@app.get('/', response_class=RedirectResponse)
async def root() -> RedirectResponse:
return RedirectResponse('/docs')
@app.get(
'/server',
tags=['Server management'],
name='get server information'
)
async def server() -> ServerResponse:
return ServerResponse(
version = __version__,
collections = [
ServerCollectionResponse(
name=collection_name,
schema=g_instance_config.schemas[collection_name],
classes=g_instance_config.model_info[collection_name][1],
)
for collection_name in g_instance_config.collections
]
)
@app.post(
'/maintenance',
tags=['Server management'],
name='put a collection in maintenance mode'
)
async def maintenance(
body: MaintenanceRequest,
api_key: str | None = Depends(api_key_header_scheme),
):
if api_key is None:
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST,
detail=f'Token required for this operation',
)
collection = body.collection
active = body.active
# Try to authenticate the token with the authentication providers that
# are associated with the collection.
check_collection(g_instance_config, collection)
auth_info = authenticate_token(g_instance_config, collection, api_key)
permissions = auth_info.token_permission
if not (
permissions.curated_write
and permissions.curated_read
and permissions.zones_access
):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST,
detail=f'Curator permissions required for this operation',
)
if active:
g_instance_config.maintenance_mode.add(collection)
else:
g_instance_config.maintenance_mode.remove(collection)
return
@app.get(
'/{collection}/record',
tags=['Read records'],
name='Read the record with the given PID from the given collection',
)
async def read_record_with_pid(
collection: str,
pid: str,
format: Format = Format.json, # noqa A002
api_key: str = Depends(api_key_header_scheme),
):
check_collection(g_configuration, collection)
final_permissions, token_store = await process_token(
g_configuration, g_instance_state, api_key, collection
)
class_name, json_object = None, None
if final_permissions.incoming_read:
with wrap_http_exception(CurieResolutionError, header='CURIE error:'):
class_name, json_object = token_store.get_object_by_pid(pid)
if not json_object and final_permissions.curated_read:
with wrap_http_exception(CurieResolutionError, header='CURIE error:'):
class_name, json_object = g_instance_state.curated_stores[
collection
].get_object_by_pid(pid)
if not json_object:
return None
if format == Format.ttl:
converter = FormatConverter(
schema=g_configuration.collections[collection].schema,
input_format=Format.json,
output_format=format,
)
with wrap_http_exception(ValueError, header='Conversion error'):
ttl_record = converter.convert(json_object, class_name)
return PlainTextResponse(ttl_record, media_type='text/turtle')
return json_object
@app.get(
'/{collection}/records/',
tags=['Read records'],
name='Read all records from the given collection',
)
async def read_all_records(
collection: str,
matching: str | None = None,
format: Format = Format.json, # noqa A002
api_key: str = Depends(api_key_header_scheme),
):
return await _read_all_records(
collection=collection,
matching=matching,
format=format,
api_key=api_key,
# Set an upper limit for the number of non-paginated result records to
# keep processing time for individual requests short and avoid
# overloading the server.
bound=1000,
)
@app.get(
'/{collection}/records/p/',
tags=['Read records'],
name='Read all records from the given collection with pagination',
)
async def read_all_records_paginated(
collection: str,
matching: str | None = None,
format: Format = Format.json, # noqa A002
api_key: str = Depends(api_key_header_scheme),
) -> Page[dict | str]:
result_list = await _read_all_records(
collection=collection,
matching=matching,
format=format,
api_key=api_key,
bound=None,
)
return paginate(result_list)
@app.get(
'/{collection}/records/{class_name}',
tags=['Read records'],
name='Read records of the given class (or subclass) from the given collection',
)
async def read_records_of_type(
collection: str,
class_name: str,
matching: str | None = None,
format: Format = Format.json, # noqa A002
api_key: str = Depends(api_key_header_scheme),
):
return await _read_records_of_type(
collection=collection,
class_name=class_name,
matching=matching,
format=format,
api_key=api_key,
# Set an upper limit for the number of non-paginated result records to
# keep processing time for individual requests short and avoid
# overloading the server.
bound=1000,
)
@app.get(
'/{collection}/records/p/{class_name}',
tags=['Read records'],
name='Read records of the given class (or subclass) from the given collection with pagination',
)
async def read_records_of_type_paginated(
collection: str,
class_name: str,
matching: str | None = None,
format: Format = Format.json, # noqa A002
api_key: str = Depends(api_key_header_scheme),
) -> Page[dict | str]:
result_list = await _read_records_of_type(
collection=collection,
class_name=class_name,
matching=matching,
format=format,
api_key=api_key,
bound=None,
)
return paginate(result_list)
async def _read_all_records(
collection: str,
matching: str | None = None,
format: Format = Format.json, # noqa A002
api_key: str = Depends(api_key_header_scheme),
bound: int | None = None,
) -> LazyList:
def convert_to_http_exception(e: BaseException):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST,
detail=f'Conversion error: {e}',
) from e
check_collection(g_instance_config, collection)
final_permissions, token_store = await process_token(
g_instance_config, api_key, collection
)
result_list = PriorityList()
if final_permissions.incoming_read:
token_store_list = token_store.get_all_objects(matching=matching)
if bound:
check_bounds(len(token_store_list), bound, collection, 'records/p/')
result_list.add_list(token_store_list)
if final_permissions.curated_read:
curated_store_list = g_instance_config.curated_stores[
collection
].get_all_objects(
matching=matching,
)
if bound:
check_bounds(len(curated_store_list), bound, collection, 'records/p/')
result_list.add_list(curated_store_list)
# Sort the result list.
result_list.sort(key=result_list.sort_key)
if format == Format.ttl:
result_list = ConvertingList(
result_list,
g_instance_config.schemas[collection],
input_format=Format.json,
output_format=format,
exception_handler=convert_to_http_exception,
)
else:
result_list = ModifierList(
result_list,
lambda record_info: record_info.json_object,
)
return result_list
async def _read_records_of_type(
collection: str,
class_name: str,
matching: str | None = None,
format: Format = Format.json, # noqa A002
api_key: str = Depends(api_key_header_scheme),
bound: int | None = None,
) -> LazyList:
def convert_to_http_exception(e: BaseException):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST,
detail=f'Conversion error: {e}',
) from e
check_collection(g_instance_config, collection)
model = g_instance_config.model_info[collection][0]
if class_name not in g_instance_config.use_classes[collection]:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"No '{class_name}'-class in collection '{collection}'.",
)
final_permissions, token_store = await process_token(
g_instance_config, api_key, collection
)
result_list = PriorityList()
if final_permissions.incoming_read:
for search_class_name in get_subclasses(model, class_name):
token_store_list = token_store.get_objects_of_class(
class_name=search_class_name,
matching=matching,
)
if bound:
check_bounds(len(token_store_list), bound, collection, f'/records/p/{class_name}')
result_list.add_list(token_store_list)
if final_permissions.curated_read:
for search_class_name in get_subclasses(model, class_name):
curated_store_list = g_instance_config.curated_stores[
collection
].get_objects_of_class(
class_name=search_class_name,
matching=matching,
)
if bound:
check_bounds(len(curated_store_list), bound, collection, f'/records/p/{class_name}')
result_list.add_list(curated_store_list)
# Sort the result list.
result_list.sort(key=result_list.sort_key)
if format == Format.ttl:
result_list = ConvertingList(
result_list,
g_instance_config.schemas[collection],
input_format=Format.json,
output_format=format,
exception_handler=convert_to_http_exception,
)
else:
result_list = ModifierList(
result_list,
lambda record_info: record_info.json_object,
)
return result_list
@app.delete(
'/{collection}/record',
tags=['Delete records'],
name='Delete record with the given pid from the given collection',
)
async def delete_record(
collection: str,
pid: str,
api_key: str = Depends(api_key_header_scheme),
):
check_collection(g_instance_config, collection)
final_permissions, token_store = await process_token(
g_instance_config, api_key, collection
)
if not final_permissions.incoming_write:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN,
detail=f"No write access to incoming data in collection '{collection}'.",
)
with wrap_http_exception(Exception):
result = token_store.delete_object(pid)
if not result:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"Could not remove record with PID '{pid}' from the "
"token associated incoming area of collection "
f"'{collection}'.",
)
return True
# Create dynamic endpoints and rebuild the app to include all dynamically
# created endpoints.
#create_store_endpoints(app, g_instance_config, tag_info, 'placeholder_write', globals())
#create_validate_endpoints(app, g_instance_config, tag_info, 'placeholder_validate', globals())
#create_curated_endpoints(app, tag_info, 'placeholder_curated_write', globals())
#create_incoming_endpoints(app, tag_info, 'placeholder_incoming_write', globals())
#app.openapi_schema = None
#app.setup()
def main():
uvicorn.run(
app,
host=arguments.host,
port=arguments.port,
root_path=arguments.root_path,
)
if __name__ == '__main__':
main()