136 lines
4.7 KiB
Python
136 lines
4.7 KiB
Python
import logging
|
|
from itertools import count
|
|
|
|
from fastapi import FastAPI
|
|
|
|
from dump_things_service import config
|
|
from dump_things_service.abstract_config import Configuration
|
|
from dump_things_service.instance_state import InstanceState
|
|
|
|
|
|
logger = logging.getLogger('dump_things_service')
|
|
|
|
|
|
_endpoint_template = """
|
|
async def {name}(
|
|
data: {model_var_name}.{class_name} | Annotated[str, Body(media_type='text/plain')],
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
format: Format = Format.json,
|
|
) -> JSONResponse | PlainTextResponse:
|
|
logger.info('{name}(%s, %s, %s, %s)', repr(data), repr('{class_name}'), repr({model_var_name}), repr(format))
|
|
return {handler}('{collection}', data, '{class_name}', {model_var_name}, format, api_key)
|
|
"""
|
|
|
|
|
|
def create_store_endpoints(
|
|
app: FastAPI,
|
|
configuration: Configuration,
|
|
tag_info: list[dict[str, str]],
|
|
placeholder: str,
|
|
global_dict: dict,
|
|
):
|
|
# Create endpoints for all classes in all collections
|
|
logger.info('Creating dynamic store_record endpoints...')
|
|
serial_number = count()
|
|
|
|
generated_tags = []
|
|
|
|
for collection_name, collection in configuration.collections.items():
|
|
|
|
tag_name = f'Write records to collection "{collection}"'
|
|
|
|
model, model_var_name = None, 'model_' + collection_name
|
|
|
|
global_dict[model_var_name] = model
|
|
for class_name in instance_state.use_classes[collection]:
|
|
|
|
# Create an endpoint to dump data of type `class_name` in version
|
|
# `version` of schema `application`.
|
|
endpoint_name = f'_endpoint_{next(serial_number)}'
|
|
|
|
endpoint_source = _endpoint_template.format(
|
|
name=endpoint_name,
|
|
model_var_name=model_var_name,
|
|
class_name=class_name,
|
|
collection=collection,
|
|
info=f"'store {collection}/{class_name} objects'",
|
|
handler='store_record',
|
|
)
|
|
exec(endpoint_source, global_dict) # noqa S102
|
|
|
|
# Create an API route for the endpoint
|
|
app.add_api_route(
|
|
path=f'/{collection}/record/{class_name}',
|
|
endpoint=global_dict[endpoint_name],
|
|
methods=['POST'],
|
|
name=f'store "{class_name}" object (schema: {model.linkml_meta["id"]})',
|
|
response_model=None,
|
|
tags=[tag_name]
|
|
)
|
|
|
|
generated_tags.append({
|
|
'name': tag_name,
|
|
'description': '',
|
|
})
|
|
|
|
index = tag_info.index({'name': placeholder, 'description': ''})
|
|
tag_info[index:index + 1] = generated_tags
|
|
|
|
logger.info('Creation of %d endpoints completed.', next(serial_number))
|
|
|
|
|
|
def create_validate_endpoints(
|
|
app: FastAPI,
|
|
configuration: Configuration,
|
|
tag_info: list[dict[str, str]],
|
|
placeholder: str,
|
|
global_dict: dict,
|
|
):
|
|
# Create endpoints for all classes in all collections
|
|
logger.info('Creating dynamic validate_record endpoints...')
|
|
serial_number = count()
|
|
|
|
generated_tags = []
|
|
|
|
for collection_name, collection in configuration.collections.items():
|
|
|
|
tag_name = f'Validate records for collection "{collection}"'
|
|
|
|
model, model_var_name = None, 'model_' + collection_name
|
|
|
|
global_dict[model_var_name] = model
|
|
for class_name in instance_state.use_classes[collection]:
|
|
|
|
# Create an endpoint to dump data of type `class_name` in version
|
|
# `version` of schema `application`.
|
|
endpoint_name = f'_endpoint_validate_{next(serial_number)}'
|
|
|
|
endpoint_source = _endpoint_template.format(
|
|
name=endpoint_name,
|
|
model_var_name=model_var_name,
|
|
class_name=class_name,
|
|
collection=collection,
|
|
info=f"'validate {collection}/{class_name} objects'",
|
|
handler='validate_record',
|
|
)
|
|
exec(endpoint_source, global_dict) # noqa S102
|
|
|
|
# Create an API route for the endpoint
|
|
instance_state.fastapi_app.add_api_route(
|
|
path=f'/{collection}/validate/record/{class_name}',
|
|
endpoint=global_dict[endpoint_name],
|
|
methods=['POST'],
|
|
name=f'Validate "{class_name}" object (schema: {model.linkml_meta["id"]})',
|
|
response_model=None,
|
|
tags=[tag_name]
|
|
)
|
|
|
|
generated_tags.append({
|
|
'name': tag_name,
|
|
'description': '',
|
|
})
|
|
|
|
index = tag_info.index({'name': placeholder, 'description': ''})
|
|
tag_info[index:index + 1] = generated_tags
|
|
|
|
logger.info('Creation of %d endpoints completed.', next(serial_number))
|