dump-things-server/dump_things_service/collection_endpoints.py
Christian Monch a753538b4c improve incoming-path verification
Verify that incoming paths exist when a collection
is created a token has write-access to the
collection. Checks that incoming-label is set
for the token.

Add tests for incoming path validation.
2026-06-12 12:41:54 +02:00

240 lines
7.3 KiB
Python

import logging
from pathlib import (
Path,
PurePosixPath,
)
from typing import Literal
from urllib.parse import quote
from fastapi import (
APIRouter,
Depends,
HTTPException,
Response,
)
from pydantic import BaseModel
from dump_things_service import (
HTTP_201_CREATED,
HTTP_404_NOT_FOUND,
HTTP_406_NOT_ACCEPTABLE,
HTTP_409_CONFLICT,
reserved_collection_names,
)
from dump_things_service.abstract_config import (
Configuration,
CollectionConfig,
StrictModel,
store_config,
get_config, get_token_permissions,
)
from dump_things_service.admin import authenticate_admin
from dump_things_service.api_key import api_key_header_scheme
from dump_things_service.instance_state import get_instance_state, InstanceState
from dump_things_service.manifest import manifest_configuration
from dump_things_service.exceptions import ConfigError
from dump_things_service.utils import wrap_http_exception
logger = logging.getLogger('dump_things_service')
router = APIRouter()
class ConfigAuthSpec(BaseModel):
type: Literal['config'] = 'config'
class ForgejoAuthSpec(BaseModel):
type: Literal['forgejo']
url: str
organization: str
team: str
label_type: Literal['team', 'user']
repository: str | None = None
class TagSpec(BaseModel):
submitter_id_tag: str = 'http://purl.obolibrary.org/obo/NCIT_C54269'
submission_time_tag: str = 'http://semanticscience.org/resource/SIO_001083'
from pydantic import ConfigDict, Field
from dump_things_service.abstract_config import RecordDirBackendConfig, SQLiteBackendConfig, GitAuditBackendConfig
class CollectionRequest(CollectionConfig):
name: str
@router.post(
'/collections',
tags=['Administration interface'],
name='Create a new collection',
status_code=HTTP_201_CREATED,
)
async def create_collection(
response: Response,
body: CollectionRequest,
api_key: str = Depends(api_key_header_scheme),
):
instance_state = get_instance_state()
abstract_config = get_config()
# Check admin rights
authenticate_admin(instance_state, abstract_config, api_key)
# Check for existing collection name
if body.name in abstract_config.collections:
raise HTTPException(
status_code=HTTP_409_CONFLICT,
detail=f"Collection with name '{body.name}' already exists.",
)
# Check for reserved collection names
if body.name in reserved_collection_names:
raise HTTPException(
status_code=HTTP_409_CONFLICT,
detail=f"Collection name '{body.name}' is reserved and cannot be created.",
)
# Check for distinct directories
for directory in (body.incoming, body.curated):
if directory:
ensure_unique_directory(
abstract_config,
instance_state,
directory,
)
# Check for incoming directory if any of the tokens allows writing
validate_incoming_paths(abstract_config, body)
# Update the abstract configuration
abstract_config.collections[body.name] = body
# Manifest the abstract configuration
with wrap_http_exception(ConfigError):
manifest_configuration(abstract_config, instance_state)
# Persist the abstract configuration
store_config(
store_path=instance_state.store_path,
config=abstract_config,
)
response.headers['Location'] = f'/collections/{quote(body.name)}'
@router.get(
'/collections',
tags=['Administration interface'],
name='Get existing collections',
)
async def get_collections(
api_key: str = Depends(api_key_header_scheme),
) -> dict[str, CollectionConfig]:
instance_state = get_instance_state()
abstract_config = get_config()
# Check admin rights
authenticate_admin(instance_state, abstract_config, api_key)
return abstract_config.collections
@router.get(
'/collections/{collection_name}',
tags=['Administration interface'],
name='Get existing collection by name',
)
async def get_collection_with_name(
collection_name: str,
api_key: str = Depends(api_key_header_scheme),
) -> CollectionConfig:
instance_state = get_instance_state()
abstract_config = get_config()
# Check admin rights
authenticate_admin(instance_state, abstract_config, api_key)
if collection_name not in abstract_config.collections:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"Collection with name '{collection_name}' does not exist.",
)
return abstract_config.collections[collection_name]
@router.delete(
'/collections/{collection_name}',
tags=['Administration interface'],
name='Delete collection with name',
)
async def delete_collection(
collection_name: str,
api_key: str = Depends(api_key_header_scheme),
):
instance_state = get_instance_state()
abstract_config = get_config()
# Check admin rights
authenticate_admin(instance_state, abstract_config, api_key)
if collection_name not in abstract_config.collections:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"Collection with name '{collection_name}' does not exist.",
)
# Update the abstract configuration
del abstract_config.collections[collection_name]
# Manifest the abstract configuration
with wrap_http_exception(ConfigError):
manifest_configuration(abstract_config, instance_state)
# Persist the abstract configuration
store_config(
store_path=instance_state.store_path,
config=abstract_config,
)
def ensure_unique_directory(
abstract_config: Configuration,
instance_state: InstanceState,
existing_dir: PurePosixPath,
):
abs_existing_dir = (instance_state.store_path / Path(existing_dir)).absolute()
for collection_name, collection_config in abstract_config.collections.items():
for collection_dir in collection_config.curated, collection_config.incoming:
abs_collection_dir = (instance_state.store_path / Path(collection_dir)).absolute()
if abs_collection_dir == abs_existing_dir:
raise HTTPException(
status_code=HTTP_409_CONFLICT,
detail=f"Directory '{collection_dir}' already used by collection '{collection_name}'.",
)
def validate_incoming_paths(
abstract_config: Configuration,
collection_request: CollectionRequest,
):
for token_name, token_info in abstract_config.tokens.items():
token_collection_info = token_info.collections.get(collection_request.name)
if token_collection_info:
token_permissions = get_token_permissions(token_collection_info.mode)
if token_permissions.incoming_write or token_permissions.zones_access:
if not collection_request.incoming:
detail = (
f"Cannot add collection '{collection_request.name}' without "
f"`incoming` path, because at least token '{token_name}' "
f" has write access to the collection"
)
raise HTTPException(
status_code=HTTP_406_NOT_ACCEPTABLE,
detail=detail,
)