654 lines
21 KiB
Python
654 lines
21 KiB
Python
import logging
|
|
import shutil
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from fastapi import (
|
|
FastAPI,
|
|
HTTPException,
|
|
)
|
|
from pydantic import (
|
|
BaseModel,
|
|
TypeAdapter,
|
|
ValidationError,
|
|
)
|
|
|
|
|
|
from dump_things_service import (
|
|
HTTP_400_BAD_REQUEST,
|
|
HTTP_403_FORBIDDEN,
|
|
HTTP_422_UNPROCESSABLE_CONTENT,
|
|
)
|
|
from dump_things_service.abstract_config import (
|
|
CollectionConfig,
|
|
Configuration,
|
|
ConfigAuthSpec,
|
|
ForgejoAuthSpec,
|
|
RecordDirBackendConfig,
|
|
SQLiteBackendConfig,
|
|
TagSpec,
|
|
read_config,
|
|
check_collection,
|
|
get_backend_and_extension,
|
|
get_default_token_name,
|
|
get_mapping_function,
|
|
)
|
|
from dump_things_service.auth.config import ConfigAuthenticationSource
|
|
from dump_things_service.auth.forgejo import ForgejoAuthenticationSource
|
|
from dump_things_service.backends.record_dir import (
|
|
_RecordDirStore,
|
|
RecordDirStore,
|
|
)
|
|
from dump_things_service.backends.sqlite import (
|
|
_SQLiteBackend,
|
|
SQLiteBackend,
|
|
record_file_name as sqlite_db_filename,
|
|
)
|
|
from dump_things_service.backends.schema_type_layer import SchemaTypeLayer
|
|
from dump_things_service.instance_state import (
|
|
InstanceState,
|
|
get_record_dir_config,
|
|
get_instance_state,
|
|
get_schema_info,
|
|
record_dir_config_file_name, InstanceStateCollectionInfo,
|
|
)
|
|
from dump_things_service.store.model_store import (
|
|
_ModelStore,
|
|
ModelStore,
|
|
)
|
|
from dump_things_service.converter import FormatConverter
|
|
from dump_things_service.exceptions import (
|
|
ConfigError,
|
|
ConfigCollisionError,
|
|
CurieResolutionError,
|
|
)
|
|
from dump_things_service.model import get_model_for_schema
|
|
from dump_things_service.utils import (
|
|
combine_ttl,
|
|
get_token_store,
|
|
join_default_token_permissions,
|
|
wrap_http_exception, get_required_incoming_labels,
|
|
get_required_incoming_info,
|
|
)
|
|
|
|
|
|
# This following lines are required for dynamic endpoint generation
|
|
from typing import Annotated
|
|
from fastapi import Body, Depends
|
|
from dump_things_service import Format
|
|
from dump_things_service.api_key import api_key_header_scheme
|
|
from starlette.responses import JSONResponse, PlainTextResponse
|
|
|
|
|
|
logger = logging.getLogger('dump_things_service')
|
|
|
|
_endpoint_template = """
|
|
def {name}(
|
|
data: {model_var_name}.{class_name} | Annotated[str, Body(media_type='text/plain')],
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
format: Format = Format.json,
|
|
) -> JSONResponse | PlainTextResponse:
|
|
logger.info('{name}(%s, %s, %s, %s)', repr(data), repr('{class_name}'), repr({model_var_name}), repr(format))
|
|
return {handler}('{collection}', data, '{class_name}', {model_var_name}, format, api_key)
|
|
"""
|
|
|
|
|
|
_endpoint_curated_template = """
|
|
def {name}(
|
|
data: {model_var_name}.{class_name},
|
|
author_id: str | None = None,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
) -> JSONResponse:
|
|
logger.info(
|
|
'{name}(%s, %s, %s)',
|
|
repr(data),
|
|
repr(author_id),
|
|
repr({model_var_name}),
|
|
)
|
|
return store_curated_record(
|
|
'{collection}',
|
|
data,
|
|
'{class_name}',
|
|
author_id,
|
|
api_key,
|
|
)
|
|
"""
|
|
|
|
_endpoint_incoming_template = """
|
|
def {name}(
|
|
data: {model_var_name}.{class_name},
|
|
label: str,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
) -> JSONResponse:
|
|
logger.info(
|
|
'{name}(%s, %s, %s)',
|
|
repr(data),
|
|
repr(label),
|
|
repr({model_var_name}),
|
|
)
|
|
return store_incoming_record(
|
|
'{collection}',
|
|
label,
|
|
data,
|
|
'{class_name}',
|
|
api_key,
|
|
)
|
|
"""
|
|
|
|
|
|
def create_collection(
|
|
instance_state: InstanceState,
|
|
configuration: Configuration,
|
|
collection_name: str,
|
|
):
|
|
"""Create a collection instance as specified by `collection_configuration`
|
|
|
|
Reuse existing disk structures, if they are compatible. If they are not
|
|
compatible, raise an error.
|
|
|
|
:param instance_state:
|
|
:param configuration:
|
|
:param collection_name:
|
|
:return:
|
|
"""
|
|
|
|
collection_configuration = configuration.collections[collection_name]
|
|
curated_path = Path(instance_state.store_path / collection_configuration.curated)
|
|
incoming_path = (
|
|
None
|
|
if collection_configuration.incoming is None
|
|
else Path(instance_state.store_path / collection_configuration.incoming)
|
|
)
|
|
|
|
x = """
|
|
# Get information for all incoming areas
|
|
active_incoming_store_info = get_required_incoming_info(
|
|
configuration,
|
|
collection_name,
|
|
)
|
|
|
|
if True:
|
|
if incoming_path:
|
|
if not incoming_path.exists():
|
|
incoming_path.mkdir(parents=True)
|
|
created_directories.append(incoming_path)
|
|
for _, path in active_incoming_store_info:
|
|
label_path = incoming_path / path
|
|
if not label_path.exists():
|
|
label_path.mkdir(parents=True)
|
|
created_directories.append(label_path)
|
|
|
|
# Check all active incoming paths
|
|
if incoming_path and incoming_path.exists():
|
|
for _, path in active_incoming_store_info:
|
|
label_path = incoming_path / path
|
|
if label_path.exists():
|
|
check_store_compatibility(
|
|
incoming_path / label_path,
|
|
collection_configuration.backend,
|
|
collection_configuration.schema,
|
|
)
|
|
"""
|
|
|
|
# Check for compatibility of all existing stores before creating any
|
|
# structures on disk.
|
|
if curated_path.exists():
|
|
check_store_compatibility(
|
|
curated_path,
|
|
collection_configuration.backend,
|
|
collection_configuration.schema,
|
|
)
|
|
|
|
for audit_backend in collection_configuration.audit_backends:
|
|
audit_path = Path(instance_state.store_path / audit_backend.path)
|
|
if audit_path.exists():
|
|
check_audit_compatibility(audit_path)
|
|
|
|
# We know now that all existing structures are compatible with the
|
|
# collection specification. We record what we create in order to delete
|
|
# it in case of an error.
|
|
created_directories = []
|
|
try:
|
|
if not curated_path.exists():
|
|
curated_path.mkdir(parents=True)
|
|
created_directories.append(curated_path)
|
|
|
|
if incoming_path and not incoming_path.exists():
|
|
incoming_path.mkdir(parents=True)
|
|
created_directories.append(incoming_path)
|
|
|
|
for audit_backend in collection_configuration.audit_backends:
|
|
audit_path = Path(instance_state.store_path / audit_backend.path)
|
|
if not audit_path.exists():
|
|
audit_path.mkdir(parents=True)
|
|
created_directories.append(audit_path)
|
|
|
|
except ConfigError as e:
|
|
# Delete all directories that were created in this
|
|
for directory in created_directories:
|
|
shutil.rmtree(directory)
|
|
raise
|
|
|
|
# Create the curated store
|
|
curated_store = create_store(
|
|
instance_state,
|
|
curated_path,
|
|
collection_configuration.backend,
|
|
collection_configuration.schema,
|
|
collection_configuration.submission_tags
|
|
)
|
|
instance_state.curated_stores[collection_name] = curated_store
|
|
|
|
# Incoming stores are created on demand when a token is authenticated
|
|
instance_state.incoming_stores[collection_name] = {}
|
|
|
|
# Create the schema modules, schema view, and conversion objects
|
|
schema_location = collection_configuration.schema
|
|
instance_state.schema_info[schema_location] = get_schema_info(schema_location)
|
|
|
|
# Determine the active classes based on the classes defined in the schema
|
|
# and the configuration of the collection
|
|
active_classes = set(instance_state.schema_info[schema_location].classes)
|
|
if collection_configuration.use_classes:
|
|
active_classes &= set(collection_configuration.use_classes)
|
|
if collection_configuration.ignore_classes:
|
|
active_classes -= set(collection_configuration.ignore_classes)
|
|
instance_state.collections[collection_name] = InstanceStateCollectionInfo(
|
|
active_classes=active_classes
|
|
)
|
|
|
|
# Create a validator for the collection
|
|
instance_state.validators[collection_name] = FormatConverter(
|
|
schema=collection_configuration.schema,
|
|
input_format=Format.json,
|
|
output_format=Format.ttl,
|
|
)
|
|
|
|
x = """
|
|
if incoming_path:
|
|
if collection_name not in instance_state.incoming_stores:
|
|
instance_state.incoming_stores[collection_name] = {}
|
|
|
|
# Create a store for each incoming area label in this collection
|
|
for token_name, path in active_incoming_store_info:
|
|
label_path = incoming_path / path
|
|
incoming_store = create_store(
|
|
instance_state,
|
|
label_path,
|
|
collection_configuration.backend,
|
|
collection_configuration.schema,
|
|
collection_configuration.submission_tags,
|
|
)
|
|
instance_state.incoming_stores[collection_name][token_name] = incoming_store
|
|
|
|
# Create the audit log backends
|
|
for audit_backend in collection_configuration.audit_backends:
|
|
audit_path = Path(instance_state.store_path / audit_backend.path)
|
|
if not audit_path.exists():
|
|
create_audit_store(audit_path)
|
|
created_directories.append(audit_path)
|
|
"""
|
|
|
|
# Create the authentication sources
|
|
for authentication_spec in collection_configuration.auth_sources:
|
|
create_authentication_source(
|
|
configuration,
|
|
collection_name,
|
|
authentication_spec,
|
|
instance_state,
|
|
)
|
|
|
|
# Create the dynamic endpoints for record storing & validation, for
|
|
# inbox-storing, and for curated area storing.
|
|
create_endpoints_for_collection(
|
|
instance_state,
|
|
collection_configuration,
|
|
instance_state.fastapi_app,
|
|
)
|
|
|
|
|
|
def create_store(
|
|
instance_state: InstanceState,
|
|
relative_path: Path,
|
|
backend_config: RecordDirBackendConfig | SQLiteBackendConfig,
|
|
schema: str,
|
|
submission_tags: TagSpec,
|
|
) -> _ModelStore:
|
|
|
|
backend_type, extension = get_backend_and_extension(backend_config.type)
|
|
if isinstance(backend_config, RecordDirBackendConfig):
|
|
backend = create_record_dir_backend(instance_state, relative_path, backend_config, schema)
|
|
elif isinstance(backend_config, SQLiteBackendConfig):
|
|
backend = create_sqlite_backend(instance_state, relative_path)
|
|
else:
|
|
msg = f'Unsupported backend configuration type: {backend_type} ({type(backend_config)})'
|
|
raise ConfigError(msg)
|
|
|
|
if extension == 'stl':
|
|
backend = SchemaTypeLayer(backend=backend, schema=schema)
|
|
|
|
return ModelStore(
|
|
schema=schema,
|
|
backend=backend,
|
|
tags={
|
|
'id': submission_tags.submitter_id_tag,
|
|
'time': submission_tags.submission_time_tag,
|
|
},
|
|
)
|
|
|
|
|
|
def create_record_dir_backend(
|
|
instance_state: InstanceState,
|
|
relative_path: Path,
|
|
backend_config: RecordDirBackendConfig,
|
|
schema: str,
|
|
) -> _RecordDirStore:
|
|
path = instance_state.store_path / relative_path
|
|
write_record_dir_config(path, backend_config, schema)
|
|
backend = RecordDirStore(
|
|
root=path,
|
|
pid_mapping_function=get_mapping_function(backend_config),
|
|
suffix='yaml',
|
|
order_by=instance_state.order_by,
|
|
)
|
|
backend.build_index_if_needed(schema=schema)
|
|
return backend
|
|
|
|
|
|
def create_sqlite_backend(
|
|
instance_state: InstanceState,
|
|
relative_path: Path,
|
|
) -> _SQLiteBackend:
|
|
return SQLiteBackend(
|
|
db_path=instance_state.store_path / relative_path / sqlite_db_filename,
|
|
order_by=instance_state.order_by,
|
|
)
|
|
|
|
|
|
def create_authentication_source(
|
|
abstract_configuration: Configuration,
|
|
collection_name: str,
|
|
authentication_spec: ConfigAuthSpec | ForgejoAuthSpec,
|
|
instance_state: InstanceState,
|
|
):
|
|
if collection_name not in instance_state.auth_sources:
|
|
instance_state.auth_sources[collection_name] = []
|
|
|
|
auth_sources = instance_state.auth_sources[collection_name]
|
|
if isinstance(authentication_spec, ConfigAuthSpec):
|
|
auth_source = ConfigAuthenticationSource(
|
|
abstract_configuration=abstract_configuration,
|
|
collection_name=collection_name,
|
|
)
|
|
elif isinstance(authentication_spec, ForgejoAuthSpec):
|
|
auth_source = ForgejoAuthenticationSource(
|
|
api_url=authentication_spec.url,
|
|
organization=authentication_spec.organization,
|
|
team=authentication_spec.team,
|
|
label_type=authentication_spec.label_type,
|
|
instance_id=authentication_spec.repository,
|
|
repository=authentication_spec.repository,
|
|
)
|
|
else:
|
|
msg = f"Unsupported authentication config type: '{type(authentication_spec)}'"
|
|
raise ConfigError(msg)
|
|
|
|
auth_sources.append(auth_source)
|
|
|
|
|
|
def write_record_dir_config(
|
|
path: Path,
|
|
backend_config: RecordDirBackendConfig,
|
|
schema: str,
|
|
):
|
|
assert isinstance(backend_config, RecordDirBackendConfig)
|
|
|
|
record_dir_config_file_path = path / record_dir_config_file_name
|
|
if not record_dir_config_file_path.exists():
|
|
record_dir_config_file_path.write_text(f"""# RecordDir Config
|
|
type: records
|
|
version: 1
|
|
schema: {schema}
|
|
format: yaml
|
|
idfx: {backend_config.mapping_method}
|
|
""",
|
|
)
|
|
|
|
|
|
def create_audit_store(*args, **kwargs):
|
|
return
|
|
|
|
|
|
def check_store_compatibility(
|
|
store_path: Path,
|
|
backend_config: RecordDirBackendConfig | SQLiteBackendConfig,
|
|
schema: str,
|
|
):
|
|
"""Check if an existing store is compatible with the specs in `backend_config`
|
|
|
|
:param store_path:
|
|
:param backend_config:
|
|
:param schema:
|
|
:return:
|
|
"""
|
|
if not store_path.exists():
|
|
return
|
|
if isinstance(backend_config, RecordDirBackendConfig):
|
|
check_record_dir_compatibility(store_path, backend_config, schema)
|
|
elif isinstance(backend_config, SQLiteBackendConfig):
|
|
check_sqlite_compatibility(store_path, backend_config, schema)
|
|
else:
|
|
msg = f"Unsupported backend config type: '{type(backend_config)}'"
|
|
raise ConfigError(msg)
|
|
return
|
|
|
|
|
|
def check_record_dir_compatibility(
|
|
store_path: Path,
|
|
backend_config: RecordDirBackendConfig,
|
|
schema: str,
|
|
):
|
|
record_dir_config = get_record_dir_config(store_path)
|
|
if record_dir_config.schema != schema:
|
|
raise ConfigCollisionError(f"Existing backend uses a different schema: '{record_dir_config.schema}'")
|
|
|
|
stored_mapping_method = record_dir_config.idfx.value
|
|
if stored_mapping_method != backend_config.mapping_method:
|
|
msg = f"Configuration specifies mapping method '{backend_config.mapping_method}', existing backend uses mapping method: '{stored_mapping_method}'"
|
|
raise ConfigCollisionError(msg)
|
|
return
|
|
|
|
|
|
def check_sqlite_compatibility(
|
|
store_path: Path,
|
|
backend_config: SQLiteBackendConfig,
|
|
schema: str,
|
|
):
|
|
sqlite_db_path = Path(store_path / sqlite_db_filename)
|
|
if not sqlite_db_path.exists():
|
|
raise ConfigError('No sqlite database found in existing store')
|
|
return
|
|
|
|
|
|
def check_audit_compatibility(
|
|
audit_path: Path,
|
|
):
|
|
"""Check if an existing audit is compatible with the specs
|
|
|
|
:param audit_path:
|
|
:return:
|
|
"""
|
|
if not audit_path.exists():
|
|
return
|
|
print('IMPLEMENT: check_audit_compatibility', file=sys.stderr, flush=True)
|
|
|
|
|
|
def create_endpoint(
|
|
operation_name: str,
|
|
operation_path: str,
|
|
instance_state: InstanceState,
|
|
collection_config: CollectionConfig,
|
|
template: str,
|
|
handler: str,
|
|
tag_name: str,
|
|
app: FastAPI,
|
|
):
|
|
logger.info(
|
|
f'Creating %s-endpoints for collection: "%s"',
|
|
operation_name,
|
|
collection_config.name,
|
|
)
|
|
|
|
# TODO: get schema_info from instance_state!?
|
|
model, classes, model_var_name = get_model_for_schema(collection_config.schema)
|
|
globals()[model_var_name] = model
|
|
|
|
active_classes = instance_state.collections[collection_config.name].active_classes
|
|
for class_name in active_classes:
|
|
endpoint_name = f'_endpoint_{collection_config.name}_{operation_name}_{class_name}'
|
|
endpoint_source = template.format(
|
|
name=endpoint_name,
|
|
model_var_name=model_var_name,
|
|
class_name=class_name,
|
|
collection=collection_config.name,
|
|
info=f"'{operation_name} {collection_config.name}/{class_name} objects'",
|
|
handler=handler,
|
|
)
|
|
exec(endpoint_source, globals()) # noqa S102
|
|
|
|
# Create an API route for the endpoint
|
|
app.add_api_route(
|
|
path=f'/{collection_config.name}/{operation_path}/{class_name}',
|
|
endpoint=globals()[endpoint_name],
|
|
methods=['POST'],
|
|
name=f'{operation_name} "{class_name}" object (schema: {model.linkml_meta["id"]})',
|
|
response_model=None,
|
|
tags=[tag_name]
|
|
)
|
|
|
|
logger.info(
|
|
'Creation of %d %s-endpoints completed.',
|
|
len(active_classes),
|
|
operation_name,
|
|
)
|
|
|
|
|
|
def create_endpoints_for_collection(
|
|
instance_state: InstanceState,
|
|
collection_config: CollectionConfig,
|
|
app: FastAPI,
|
|
):
|
|
for (
|
|
operation_name,
|
|
operation_path,
|
|
template,
|
|
handler,
|
|
tag_name,
|
|
) in (
|
|
('store', 'record', _endpoint_template, 'store_record', f'Write records to collection "{collection_config.name}"'),
|
|
('validate', 'validate', _endpoint_template, 'validate_record', f'Validate records for collection "{collection_config.name}"'),
|
|
('curated', 'curated/record', _endpoint_template, 'store_curated_record', f'Store records in curated area of collection "{collection_config.name}"'),
|
|
('incoming', 'incoming/{label}/record', _endpoint_template, 'store_incoming_record', f'Store records in incoming area "{{label}}" of collection "{collection_config.name}"'),
|
|
):
|
|
create_endpoint(
|
|
operation_name=operation_name,
|
|
operation_path=operation_path,
|
|
instance_state=instance_state,
|
|
collection_config=collection_config,
|
|
template=template,
|
|
handler=handler,
|
|
tag_name=tag_name,
|
|
app=app,
|
|
)
|
|
|
|
|
|
def store_record(
|
|
collection: str,
|
|
data: BaseModel | str,
|
|
class_name: str,
|
|
model: Any,
|
|
input_format: Format,
|
|
api_key: str | None = Depends(api_key_header_scheme),
|
|
) -> JSONResponse | PlainTextResponse:
|
|
if input_format == Format.json and isinstance(data, str):
|
|
raise HTTPException(
|
|
status_code=HTTP_400_BAD_REQUEST, detail='Invalid JSON data provided.'
|
|
)
|
|
|
|
if input_format == Format.ttl and not isinstance(data, str):
|
|
raise HTTPException(
|
|
status_code=HTTP_400_BAD_REQUEST, detail='Invalid ttl data provided.'
|
|
)
|
|
|
|
instance_state = get_instance_state()
|
|
abstract_config = read_config(instance_state.store_path)
|
|
check_collection(abstract_config, collection)
|
|
|
|
token = (
|
|
get_default_token_name(abstract_config, collection)
|
|
if api_key is None
|
|
else api_key
|
|
)
|
|
|
|
# Get the token permissions and extend them by the default permissions.
|
|
# This call will also convert plaintext tokens into the hashed version of
|
|
# the token, if the token is hashed. This is necessary because we do not
|
|
# store the plaintext token, so all token-information is associated with
|
|
# the hashed representation of the token.
|
|
store, token_permissions, user_id = get_token_store(
|
|
abstract_config,
|
|
instance_state,
|
|
collection,
|
|
token,
|
|
)
|
|
final_permissions = join_default_token_permissions(
|
|
abstract_config,
|
|
instance_state,
|
|
token_permissions,
|
|
collection,
|
|
)
|
|
if not final_permissions.incoming_write:
|
|
raise HTTPException(
|
|
status_code=HTTP_403_FORBIDDEN,
|
|
detail=f"Not authorized to submit to collection '{collection}'.",
|
|
)
|
|
|
|
if input_format == Format.ttl:
|
|
with wrap_http_exception(ValueError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Conversion error'):
|
|
json_object = FormatConverter(
|
|
instance_state.schemas[collection],
|
|
input_format=Format.ttl,
|
|
output_format=Format.json,
|
|
).convert(data, class_name)
|
|
with wrap_http_exception(ValidationError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Validation error'):
|
|
record = TypeAdapter(getattr(model, class_name)).validate_python(json_object)
|
|
else:
|
|
record = data
|
|
|
|
with wrap_http_exception(ValueError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Validation error'):
|
|
instance_state.validators[collection].validate(record)
|
|
|
|
with wrap_http_exception(CurieResolutionError):
|
|
stored_records = store.store_object(obj=record, submitter=user_id)
|
|
|
|
if input_format == Format.ttl:
|
|
format_converter = FormatConverter(
|
|
instance_state.schemas[collection],
|
|
input_format=Format.json,
|
|
output_format=Format.ttl,
|
|
)
|
|
with wrap_http_exception(ValueError, header='Conversion error'):
|
|
return PlainTextResponse(
|
|
combine_ttl(
|
|
[
|
|
format_converter.convert(
|
|
record,
|
|
class_name,
|
|
)
|
|
for class_name, record in stored_records
|
|
]
|
|
),
|
|
media_type='text/turtle',
|
|
)
|
|
return JSONResponse([record for _, record in stored_records])
|