All checks were successful
Test execution / Test-all (push) Successful in 2m13s
This commit fixes a bug in the code for endpoint deletion. Included routers were treated like `APIRoute` or `Route`-instances, i.e., we assumed they have a `path`-attribute. This is wrong. We can just ignore them, because collection specific routes are only `APIRoute` or `Route`-instances.
628 lines
21 KiB
Python
628 lines
21 KiB
Python
import logging
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from datalad_core.runners import (
|
|
call_git_oneline,
|
|
CommandError,
|
|
)
|
|
from fastapi import (
|
|
Depends,
|
|
FastAPI,
|
|
HTTPException,
|
|
)
|
|
from pydantic import (
|
|
BaseModel,
|
|
TypeAdapter,
|
|
ValidationError,
|
|
)
|
|
from starlette.responses import (
|
|
JSONResponse,
|
|
PlainTextResponse,
|
|
)
|
|
from starlette.status import HTTP_401_UNAUTHORIZED
|
|
|
|
from dump_things_service import (
|
|
Format,
|
|
HTTP_400_BAD_REQUEST,
|
|
HTTP_403_FORBIDDEN,
|
|
HTTP_422_UNPROCESSABLE_CONTENT,
|
|
)
|
|
from dump_things_service.abstract_config import (
|
|
CollectionConfig,
|
|
Configuration,
|
|
ConfigAuthSpec,
|
|
ForgejoAuthSpec,
|
|
RecordDirBackendConfig,
|
|
SQLiteBackendConfig,
|
|
read_config,
|
|
check_collection,
|
|
get_default_token_representation,
|
|
)
|
|
from dump_things_service.audit.gitaudit import GitAuditBackend
|
|
from dump_things_service.auth.config import ConfigAuthenticationSource
|
|
from dump_things_service.auth.forgejo import ForgejoAuthenticationSource
|
|
from dump_things_service.backends.record_dir_index import index_file_name
|
|
from dump_things_service.backends.sqlite import record_file_name as sqlite_db_filename
|
|
from dump_things_service.instance_state import (
|
|
InstanceState,
|
|
InstanceStateCollectionInfo,
|
|
get_record_dir_config,
|
|
get_instance_state,
|
|
get_schema_info,
|
|
record_dir_config_file_name,
|
|
)
|
|
from dump_things_service.converter import FormatConverter
|
|
from dump_things_service.exceptions import (
|
|
ConfigError,
|
|
ConfigCollisionError,
|
|
CurieResolutionError,
|
|
)
|
|
from dump_things_service.model import get_model_for_schema
|
|
from dump_things_service.utils import (
|
|
combine_ttl,
|
|
create_store,
|
|
get_token_store,
|
|
join_default_token_permissions,
|
|
var_escape,
|
|
wrap_http_exception,
|
|
)
|
|
|
|
|
|
# This following lines are required for dynamic endpoint generation
|
|
from typing import Annotated # noqa 401 -- used by autogenerated code
|
|
from fastapi import Body # noqa 401 -- used by autogenerated code
|
|
from dump_things_service.api_key import api_key_header_scheme # noqa 401 -- used by autogenerated code
|
|
from dump_things_service.curated import store_curated_record # noqa 401 -- used by autogenerated code
|
|
from dump_things_service.incoming import store_incoming_record # noqa 401 -- used by autogenerated code
|
|
from dump_things_service.validate import validate_record # noqa 401 -- used by autogenerated code
|
|
|
|
|
|
logger = logging.getLogger('dump_things_service')
|
|
|
|
_endpoint_template = """
|
|
def {name}(
|
|
data: {model_var_name}.{class_name} | Annotated[str, Body(media_type='text/plain')],
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
format: Format = Format.json,
|
|
) -> JSONResponse | PlainTextResponse:
|
|
logger.info('{name}(%s, %s, %s, %s)', repr(data), repr('{class_name}'), repr({model_var_name}), repr(format))
|
|
return {handler}('{collection}', data, '{class_name}', {model_var_name}, format, api_key)
|
|
"""
|
|
|
|
_endpoint_curated_template = """
|
|
def {name}(
|
|
data: {model_var_name}.{class_name},
|
|
author_id: str | None = None,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
) -> JSONResponse:
|
|
logger.info(
|
|
'{name}(%s, %s, %s)',
|
|
repr(data),
|
|
repr(author_id),
|
|
repr({model_var_name}),
|
|
)
|
|
return store_curated_record(
|
|
'{collection}',
|
|
data,
|
|
'{class_name}',
|
|
author_id,
|
|
api_key,
|
|
)
|
|
"""
|
|
|
|
_endpoint_incoming_template = """
|
|
async def {name}(
|
|
data: {model_var_name}.{class_name},
|
|
label: str,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
) -> JSONResponse:
|
|
logger.info(
|
|
'{name}(%s, %s, %s)',
|
|
repr(data),
|
|
repr(label),
|
|
repr({model_var_name}),
|
|
)
|
|
return await store_incoming_record(
|
|
'{collection}',
|
|
label,
|
|
data,
|
|
'{class_name}',
|
|
api_key,
|
|
)
|
|
"""
|
|
|
|
|
|
def create_collection(
|
|
instance_state: InstanceState,
|
|
configuration: Configuration,
|
|
collection_name: str,
|
|
):
|
|
"""Create a collection instance as specified by `collection_configuration`
|
|
|
|
Reuse existing disk structures, if they are compatible. If they are not
|
|
compatible, raise an error.
|
|
|
|
:param instance_state:
|
|
:param configuration:
|
|
:param collection_name:
|
|
:return:
|
|
"""
|
|
|
|
collection_configuration = configuration.collections[collection_name]
|
|
curated_path = Path(instance_state.store_path / collection_configuration.curated)
|
|
incoming_path = (
|
|
None
|
|
if collection_configuration.incoming is None
|
|
else Path(instance_state.store_path / collection_configuration.incoming)
|
|
)
|
|
|
|
# Check for compatibility of all existing stores before creating any
|
|
# structures on disk.
|
|
if curated_path.exists():
|
|
check_store_compatibility(
|
|
curated_path,
|
|
collection_configuration.backend,
|
|
collection_configuration.schema_location,
|
|
)
|
|
|
|
for audit_backend in collection_configuration.audit_backends:
|
|
audit_path = Path(instance_state.store_path / audit_backend.path)
|
|
if audit_path.exists():
|
|
check_git_audit_compatibility(audit_path)
|
|
|
|
# We know now that all existing structures are compatible with the
|
|
# collection specification. We record what we create in order to delete
|
|
# it in case of an error.
|
|
created_directories = []
|
|
try:
|
|
if not curated_path.exists():
|
|
curated_path.mkdir(parents=True)
|
|
created_directories.append(curated_path)
|
|
|
|
if incoming_path and not incoming_path.exists():
|
|
incoming_path.mkdir(parents=True)
|
|
created_directories.append(incoming_path)
|
|
|
|
for audit_backend in collection_configuration.audit_backends:
|
|
audit_path = Path(instance_state.store_path / audit_backend.path)
|
|
if not audit_path.exists():
|
|
audit_path.mkdir(parents=True)
|
|
created_directories.append(audit_path)
|
|
|
|
except ConfigError as e:
|
|
# Delete all directories that were created in this
|
|
for directory in created_directories:
|
|
shutil.rmtree(directory)
|
|
raise
|
|
|
|
# Create the curated store
|
|
curated_store = create_store(
|
|
abstract_configuration=configuration,
|
|
instance_state=instance_state,
|
|
collection_name=collection_name,
|
|
)
|
|
|
|
instance_state.curated_stores[collection_name] = curated_store
|
|
|
|
# Incoming stores are created on demand when a token is authenticated
|
|
instance_state.incoming_stores[collection_name] = {}
|
|
|
|
# Create the schema modules, schema view, and conversion objects
|
|
schema_location = collection_configuration.schema_location
|
|
instance_state.schema_info[schema_location] = get_schema_info(schema_location)
|
|
|
|
# Determine the active classes based on the classes defined in the schema
|
|
# and the configuration of the collection
|
|
active_classes = set(instance_state.schema_info[schema_location].classes)
|
|
if collection_configuration.use_classes:
|
|
active_classes &= set(collection_configuration.use_classes)
|
|
if collection_configuration.ignore_classes:
|
|
active_classes -= set(collection_configuration.ignore_classes)
|
|
instance_state.collections[collection_name] = InstanceStateCollectionInfo(
|
|
active_classes=active_classes,
|
|
tag_info=dict(),
|
|
)
|
|
|
|
# Create a validator for the collection
|
|
instance_state.validators[collection_name] = FormatConverter(
|
|
schema=collection_configuration.schema_location,
|
|
input_format=Format.json,
|
|
output_format=Format.ttl,
|
|
)
|
|
|
|
# Create the authentication sources
|
|
for authentication_spec in collection_configuration.auth_sources:
|
|
create_authentication_source(
|
|
configuration,
|
|
collection_name,
|
|
authentication_spec,
|
|
instance_state,
|
|
)
|
|
|
|
# Create the audit-backends
|
|
instance_state.audit_backends[collection_name] = []
|
|
for audit_backend_config in collection_configuration.audit_backends:
|
|
instance_state.audit_backends[collection_name].append(
|
|
GitAuditBackend(
|
|
path=Path(instance_state.store_path / audit_backend_config.path),
|
|
auto_flush_timeout=audit_backend_config.auto_flush_timeout,
|
|
)
|
|
)
|
|
|
|
# Create the dynamic endpoints for record storing & validation, for
|
|
# inbox-storing, and for curated area storing.
|
|
create_endpoints_for_collection(
|
|
instance_state,
|
|
collection_name,
|
|
collection_configuration,
|
|
instance_state.fastapi_app,
|
|
)
|
|
|
|
|
|
def create_authentication_source(
|
|
abstract_configuration: Configuration,
|
|
collection_name: str,
|
|
authentication_spec: ConfigAuthSpec | ForgejoAuthSpec,
|
|
instance_state: InstanceState,
|
|
):
|
|
if collection_name not in instance_state.auth_sources:
|
|
instance_state.auth_sources[collection_name] = []
|
|
|
|
auth_sources = instance_state.auth_sources[collection_name]
|
|
if isinstance(authentication_spec, ConfigAuthSpec):
|
|
auth_source = ConfigAuthenticationSource(
|
|
abstract_configuration=abstract_configuration,
|
|
collection_name=collection_name,
|
|
)
|
|
elif isinstance(authentication_spec, ForgejoAuthSpec):
|
|
auth_source = ForgejoAuthenticationSource(
|
|
api_url=authentication_spec.url,
|
|
organization=authentication_spec.organization,
|
|
team=authentication_spec.team,
|
|
label_type=authentication_spec.label_type,
|
|
instance_id=authentication_spec.instance_id,
|
|
repository=authentication_spec.repository,
|
|
)
|
|
else:
|
|
msg = f"Unsupported authentication config type: '{type(authentication_spec)}'"
|
|
raise ConfigError(msg)
|
|
|
|
auth_sources.append(auth_source)
|
|
|
|
|
|
def write_record_dir_config(
|
|
path: Path,
|
|
backend_config: RecordDirBackendConfig,
|
|
schema: str,
|
|
):
|
|
assert isinstance(backend_config, RecordDirBackendConfig)
|
|
|
|
record_dir_config_file_path = path / record_dir_config_file_name
|
|
if not record_dir_config_file_path.exists():
|
|
record_dir_config_file_path.write_text(f"""# RecordDir Config
|
|
type: records
|
|
version: 1
|
|
schema: {schema}
|
|
format: yaml
|
|
idfx: {backend_config.mapping_method}
|
|
""",
|
|
)
|
|
|
|
|
|
def check_store_compatibility(
|
|
store_path: Path,
|
|
backend_config: RecordDirBackendConfig | SQLiteBackendConfig,
|
|
schema: str,
|
|
):
|
|
"""Check if an existing store is compatible with the specs in `backend_config`
|
|
|
|
:param store_path:
|
|
:param backend_config:
|
|
:param schema:
|
|
:return:
|
|
"""
|
|
if not store_path.exists():
|
|
return
|
|
if isinstance(backend_config, RecordDirBackendConfig):
|
|
check_record_dir_compatibility(store_path, backend_config, schema)
|
|
elif isinstance(backend_config, SQLiteBackendConfig):
|
|
check_sqlite_compatibility(store_path)
|
|
else:
|
|
msg = f"Unsupported backend config type: '{type(backend_config)}'"
|
|
raise ConfigError(msg)
|
|
return
|
|
|
|
|
|
def check_record_dir_compatibility(
|
|
store_path: Path,
|
|
backend_config: RecordDirBackendConfig,
|
|
schema: str,
|
|
):
|
|
|
|
# Non-existing or empty record_dir-directories are compatible
|
|
if not store_path.exists():
|
|
return
|
|
|
|
# A record_dir-directory is considered to be empty, if it contains no
|
|
# files or only an record_dir-index file
|
|
files_in_dir = tuple(map(lambda dir_entry: dir_entry.name, os.scandir(store_path)))
|
|
if files_in_dir in ((), (index_file_name,)):
|
|
return
|
|
|
|
record_dir_config = get_record_dir_config(store_path)
|
|
if record_dir_config.schema_location != schema:
|
|
raise ConfigCollisionError(f"Existing backend uses a different schema: '{record_dir_config.schema_location}'")
|
|
|
|
stored_mapping_method = record_dir_config.idfx.value
|
|
if stored_mapping_method != backend_config.mapping_method:
|
|
msg = f"Configuration specifies mapping method '{backend_config.mapping_method}', existing backend uses mapping method: '{stored_mapping_method}'"
|
|
raise ConfigCollisionError(msg)
|
|
return
|
|
|
|
|
|
def check_sqlite_compatibility(
|
|
store_path: Path,
|
|
):
|
|
sqlite_db_path = Path(store_path / sqlite_db_filename)
|
|
if not sqlite_db_path.exists():
|
|
raise ConfigError('No sqlite database found in existing store')
|
|
return
|
|
|
|
|
|
def check_git_audit_compatibility(
|
|
audit_path: Path,
|
|
):
|
|
"""Check if an existing audit path is compatible with a git audit store
|
|
|
|
:param audit_path:
|
|
:return:
|
|
"""
|
|
|
|
# Non-existing or empty directories are valid gitaudit-locations
|
|
if not audit_path.exists():
|
|
return
|
|
if not tuple(os.scandir(audit_path)):
|
|
return
|
|
|
|
# A non-empty directory should contain bare git repository
|
|
try:
|
|
result = call_git_oneline(
|
|
['rev-parse', '--is-bare-repository'],
|
|
cwd=audit_path,
|
|
force_c_locale=True,
|
|
)
|
|
except CommandError as ce:
|
|
raise ConfigError(f'No git repository in gitaudit-path: {audit_path}') from ce
|
|
if result.strip().lower() != 'true':
|
|
raise ConfigError(f'No bare git repository in gitaudit-path: {audit_path}')
|
|
return
|
|
|
|
|
|
def create_endpoint(
|
|
operation_name: str,
|
|
operation_path: str,
|
|
instance_state: InstanceState,
|
|
collection_name: str,
|
|
collection_config: CollectionConfig,
|
|
template: str,
|
|
handler: str,
|
|
tag_group: str,
|
|
tag_name: str,
|
|
app: FastAPI,
|
|
):
|
|
logger.info(
|
|
f'Creating %s-endpoints for collection: "%s"',
|
|
operation_name,
|
|
collection_name,
|
|
)
|
|
|
|
instance_state.collections[collection_name].tag_info[tag_group] = tag_name
|
|
|
|
# TODO: get schema_info from instance_state!?
|
|
model, classes, model_var_name = get_model_for_schema(collection_config.schema_location)
|
|
globals()[model_var_name] = model
|
|
|
|
active_classes = instance_state.collections[collection_name].active_classes
|
|
for class_name in active_classes:
|
|
endpoint_name = f'_endpoint_{var_escape(collection_name)}_{operation_name}_{class_name}'
|
|
endpoint_source = template.format(
|
|
name=endpoint_name,
|
|
model_var_name=model_var_name,
|
|
class_name=class_name,
|
|
collection=collection_name,
|
|
info=f"'{operation_name} {collection_name}/{class_name} objects'",
|
|
handler=handler,
|
|
)
|
|
exec(endpoint_source, globals()) # noqa S102
|
|
|
|
# Create an API route for the endpoint
|
|
app.add_api_route(
|
|
path=f'/{collection_name}/{operation_path}/{class_name}',
|
|
endpoint=globals()[endpoint_name],
|
|
methods=['POST'],
|
|
name=f'{operation_name} "{class_name}" object (schema: {model.linkml_meta["id"]})',
|
|
response_model=None,
|
|
tags=[tag_name]
|
|
)
|
|
|
|
logger.info(
|
|
'Creation of %d %s-endpoints completed.',
|
|
len(active_classes),
|
|
operation_name,
|
|
)
|
|
|
|
|
|
def create_endpoints_for_collection(
|
|
instance_state: InstanceState,
|
|
collection_name: str,
|
|
collection_config: CollectionConfig,
|
|
app: FastAPI,
|
|
):
|
|
for (
|
|
operation_name,
|
|
operation_path,
|
|
template,
|
|
handler,
|
|
tag_group,
|
|
tag_name,
|
|
) in (
|
|
('store', 'record', _endpoint_template, 'store_record', 'write', f'Write records to collection "{collection_name}"'),
|
|
('validate', 'validate/record', _endpoint_template, 'validate_record', 'validate', f'Validate records for collection "{collection_name}"'),
|
|
('curated', 'curated/record', _endpoint_curated_template, 'store_curated_record', 'curated_write', f'Curated area: store records in curated area of collection "{collection_name}"'),
|
|
('incoming', 'incoming/{label}/record', _endpoint_incoming_template, 'store_incoming_record', 'incoming_write', f'Incoming area: store records in incoming area "{{label}}" of collection "{collection_name}"'),
|
|
):
|
|
create_endpoint(
|
|
operation_name=operation_name,
|
|
operation_path=operation_path,
|
|
instance_state=instance_state,
|
|
collection_name=collection_name,
|
|
collection_config=collection_config,
|
|
template=template,
|
|
handler=handler,
|
|
tag_group=tag_group,
|
|
tag_name=tag_name,
|
|
app=app,
|
|
)
|
|
|
|
|
|
def delete_endpoints_for_collection(
|
|
instance_state: InstanceState,
|
|
collection_name: str,
|
|
):
|
|
|
|
active_classes = instance_state.collections[collection_name].active_classes
|
|
|
|
for operation_path in (
|
|
'record',
|
|
'validate/record',
|
|
'curated/record',
|
|
'incoming/{label}/record'
|
|
):
|
|
delete_endpoint(
|
|
collection_name=collection_name,
|
|
active_classes=active_classes,
|
|
operation_path=operation_path,
|
|
app=instance_state.fastapi_app,
|
|
)
|
|
|
|
|
|
def delete_endpoint(
|
|
collection_name: str,
|
|
active_classes: set[str],
|
|
operation_path: str,
|
|
app: FastAPI,
|
|
):
|
|
from fastapi.routing import _IncludedRouter
|
|
|
|
remove_paths_set = set(
|
|
f'/{collection_name}/{operation_path}/{class_name}'
|
|
for class_name in active_classes
|
|
)
|
|
|
|
remove_indices = [
|
|
index
|
|
for index, api_route in enumerate(app.router.routes)
|
|
if not isinstance(api_route, _IncludedRouter) # skip included router
|
|
and api_route.path in remove_paths_set
|
|
]
|
|
for index in sorted(remove_indices, reverse=True):
|
|
del app.router.routes[index]
|
|
|
|
|
|
def store_record(
|
|
collection: str,
|
|
data: BaseModel | str,
|
|
class_name: str,
|
|
model: Any,
|
|
input_format: Format,
|
|
api_key: str | None = Depends(api_key_header_scheme),
|
|
) -> JSONResponse | PlainTextResponse:
|
|
if input_format == Format.json and isinstance(data, str):
|
|
raise HTTPException(
|
|
status_code=HTTP_400_BAD_REQUEST, detail='Invalid JSON data provided.'
|
|
)
|
|
|
|
if input_format == Format.ttl and not isinstance(data, str):
|
|
raise HTTPException(
|
|
status_code=HTTP_400_BAD_REQUEST, detail='Invalid ttl data provided.'
|
|
)
|
|
|
|
instance_state = get_instance_state()
|
|
abstract_config = read_config(instance_state.store_path)
|
|
check_collection(abstract_config, collection)
|
|
|
|
token_representation = get_default_token_representation(
|
|
abstract_config,
|
|
collection,
|
|
) if api_key is None else api_key
|
|
|
|
if not token_representation:
|
|
raise HTTPException(
|
|
status_code=HTTP_401_UNAUTHORIZED,
|
|
detail=f'Not authorized to submit to collection "{collection}"',
|
|
)
|
|
|
|
# Get the token permissions and extend them by the default permissions.
|
|
# This call will also convert plaintext tokens into the hashed version of
|
|
# the token, if the token is hashed. This is necessary because we do not
|
|
# store the plaintext token, so all token-information is associated with
|
|
# the hashed representation of the token.
|
|
store, token_permissions, user_id = get_token_store(
|
|
abstract_config,
|
|
instance_state,
|
|
collection,
|
|
token_representation,
|
|
)
|
|
final_permissions = join_default_token_permissions(
|
|
abstract_config,
|
|
instance_state,
|
|
token_permissions,
|
|
collection,
|
|
)
|
|
if not final_permissions.incoming_write:
|
|
raise HTTPException(
|
|
status_code=HTTP_403_FORBIDDEN,
|
|
detail=f"Not authorized to submit to collection '{collection}'.",
|
|
)
|
|
|
|
if input_format == Format.ttl:
|
|
with wrap_http_exception(ValueError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Conversion error'):
|
|
json_object = FormatConverter(
|
|
abstract_config.collections[collection].schema_location,
|
|
input_format=Format.ttl,
|
|
output_format=Format.json,
|
|
).convert(data, class_name)
|
|
with wrap_http_exception(ValidationError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Validation error'):
|
|
record = TypeAdapter(getattr(model, class_name)).validate_python(json_object)
|
|
else:
|
|
record = data
|
|
|
|
with wrap_http_exception(ValueError, status_code=HTTP_422_UNPROCESSABLE_CONTENT, header='Validation error'):
|
|
instance_state.validators[collection].validate(record)
|
|
|
|
with wrap_http_exception(CurieResolutionError):
|
|
stored_records = store.store_object(obj=record, submitter=user_id)
|
|
|
|
if input_format == Format.ttl:
|
|
format_converter = FormatConverter(
|
|
abstract_config.collections[collection].schema_location,
|
|
input_format=Format.json,
|
|
output_format=Format.ttl,
|
|
)
|
|
with wrap_http_exception(ValueError, header='Conversion error'):
|
|
return PlainTextResponse(
|
|
combine_ttl(
|
|
[
|
|
format_converter.convert(
|
|
record,
|
|
class_name,
|
|
)
|
|
for class_name, record in stored_records
|
|
]
|
|
),
|
|
media_type='text/turtle',
|
|
)
|
|
return JSONResponse([record for _, record in stored_records])
|