Verify that incoming paths exist when a collection is created a token has write-access to the collection. Checks that incoming-label is set for the token. Add tests for incoming path validation.
240 lines
7.3 KiB
Python
240 lines
7.3 KiB
Python
import logging
|
|
from pathlib import (
|
|
Path,
|
|
PurePosixPath,
|
|
)
|
|
from typing import Literal
|
|
from urllib.parse import quote
|
|
|
|
from fastapi import (
|
|
APIRouter,
|
|
Depends,
|
|
HTTPException,
|
|
Response,
|
|
)
|
|
from pydantic import BaseModel
|
|
|
|
from dump_things_service import (
|
|
HTTP_201_CREATED,
|
|
HTTP_404_NOT_FOUND,
|
|
HTTP_406_NOT_ACCEPTABLE,
|
|
HTTP_409_CONFLICT,
|
|
reserved_collection_names,
|
|
)
|
|
from dump_things_service.abstract_config import (
|
|
Configuration,
|
|
CollectionConfig,
|
|
StrictModel,
|
|
store_config,
|
|
get_config, get_token_permissions,
|
|
)
|
|
from dump_things_service.admin import authenticate_admin
|
|
from dump_things_service.api_key import api_key_header_scheme
|
|
from dump_things_service.instance_state import get_instance_state, InstanceState
|
|
from dump_things_service.manifest import manifest_configuration
|
|
from dump_things_service.exceptions import ConfigError
|
|
from dump_things_service.utils import wrap_http_exception
|
|
|
|
|
|
logger = logging.getLogger('dump_things_service')
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class ConfigAuthSpec(BaseModel):
|
|
type: Literal['config'] = 'config'
|
|
|
|
|
|
class ForgejoAuthSpec(BaseModel):
|
|
type: Literal['forgejo']
|
|
url: str
|
|
organization: str
|
|
team: str
|
|
label_type: Literal['team', 'user']
|
|
repository: str | None = None
|
|
|
|
|
|
class TagSpec(BaseModel):
|
|
submitter_id_tag: str = 'http://purl.obolibrary.org/obo/NCIT_C54269'
|
|
submission_time_tag: str = 'http://semanticscience.org/resource/SIO_001083'
|
|
|
|
|
|
from pydantic import ConfigDict, Field
|
|
from dump_things_service.abstract_config import RecordDirBackendConfig, SQLiteBackendConfig, GitAuditBackendConfig
|
|
|
|
class CollectionRequest(CollectionConfig):
|
|
name: str
|
|
|
|
|
|
@router.post(
|
|
'/collections',
|
|
tags=['Administration interface'],
|
|
name='Create a new collection',
|
|
status_code=HTTP_201_CREATED,
|
|
)
|
|
async def create_collection(
|
|
response: Response,
|
|
body: CollectionRequest,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
):
|
|
|
|
instance_state = get_instance_state()
|
|
abstract_config = get_config()
|
|
|
|
# Check admin rights
|
|
authenticate_admin(instance_state, abstract_config, api_key)
|
|
|
|
# Check for existing collection name
|
|
if body.name in abstract_config.collections:
|
|
raise HTTPException(
|
|
status_code=HTTP_409_CONFLICT,
|
|
detail=f"Collection with name '{body.name}' already exists.",
|
|
)
|
|
|
|
# Check for reserved collection names
|
|
if body.name in reserved_collection_names:
|
|
raise HTTPException(
|
|
status_code=HTTP_409_CONFLICT,
|
|
detail=f"Collection name '{body.name}' is reserved and cannot be created.",
|
|
)
|
|
|
|
# Check for distinct directories
|
|
for directory in (body.incoming, body.curated):
|
|
if directory:
|
|
ensure_unique_directory(
|
|
abstract_config,
|
|
instance_state,
|
|
directory,
|
|
)
|
|
|
|
# Check for incoming directory if any of the tokens allows writing
|
|
validate_incoming_paths(abstract_config, body)
|
|
|
|
# Update the abstract configuration
|
|
abstract_config.collections[body.name] = body
|
|
|
|
# Manifest the abstract configuration
|
|
with wrap_http_exception(ConfigError):
|
|
manifest_configuration(abstract_config, instance_state)
|
|
|
|
# Persist the abstract configuration
|
|
store_config(
|
|
store_path=instance_state.store_path,
|
|
config=abstract_config,
|
|
)
|
|
|
|
response.headers['Location'] = f'/collections/{quote(body.name)}'
|
|
|
|
|
|
@router.get(
|
|
'/collections',
|
|
tags=['Administration interface'],
|
|
name='Get existing collections',
|
|
)
|
|
async def get_collections(
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
) -> dict[str, CollectionConfig]:
|
|
|
|
instance_state = get_instance_state()
|
|
abstract_config = get_config()
|
|
|
|
# Check admin rights
|
|
authenticate_admin(instance_state, abstract_config, api_key)
|
|
return abstract_config.collections
|
|
|
|
|
|
@router.get(
|
|
'/collections/{collection_name}',
|
|
tags=['Administration interface'],
|
|
name='Get existing collection by name',
|
|
)
|
|
async def get_collection_with_name(
|
|
collection_name: str,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
) -> CollectionConfig:
|
|
|
|
instance_state = get_instance_state()
|
|
abstract_config = get_config()
|
|
|
|
# Check admin rights
|
|
authenticate_admin(instance_state, abstract_config, api_key)
|
|
|
|
if collection_name not in abstract_config.collections:
|
|
raise HTTPException(
|
|
status_code=HTTP_404_NOT_FOUND,
|
|
detail=f"Collection with name '{collection_name}' does not exist.",
|
|
)
|
|
return abstract_config.collections[collection_name]
|
|
|
|
|
|
@router.delete(
|
|
'/collections/{collection_name}',
|
|
tags=['Administration interface'],
|
|
name='Delete collection with name',
|
|
)
|
|
async def delete_collection(
|
|
collection_name: str,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
):
|
|
|
|
instance_state = get_instance_state()
|
|
abstract_config = get_config()
|
|
|
|
# Check admin rights
|
|
authenticate_admin(instance_state, abstract_config, api_key)
|
|
|
|
if collection_name not in abstract_config.collections:
|
|
raise HTTPException(
|
|
status_code=HTTP_404_NOT_FOUND,
|
|
detail=f"Collection with name '{collection_name}' does not exist.",
|
|
)
|
|
|
|
# Update the abstract configuration
|
|
del abstract_config.collections[collection_name]
|
|
|
|
# Manifest the abstract configuration
|
|
with wrap_http_exception(ConfigError):
|
|
manifest_configuration(abstract_config, instance_state)
|
|
|
|
# Persist the abstract configuration
|
|
store_config(
|
|
store_path=instance_state.store_path,
|
|
config=abstract_config,
|
|
)
|
|
|
|
|
|
def ensure_unique_directory(
|
|
abstract_config: Configuration,
|
|
instance_state: InstanceState,
|
|
existing_dir: PurePosixPath,
|
|
):
|
|
abs_existing_dir = (instance_state.store_path / Path(existing_dir)).absolute()
|
|
for collection_name, collection_config in abstract_config.collections.items():
|
|
for collection_dir in collection_config.curated, collection_config.incoming:
|
|
abs_collection_dir = (instance_state.store_path / Path(collection_dir)).absolute()
|
|
if abs_collection_dir == abs_existing_dir:
|
|
raise HTTPException(
|
|
status_code=HTTP_409_CONFLICT,
|
|
detail=f"Directory '{collection_dir}' already used by collection '{collection_name}'.",
|
|
)
|
|
|
|
|
|
def validate_incoming_paths(
|
|
abstract_config: Configuration,
|
|
collection_request: CollectionRequest,
|
|
):
|
|
for token_name, token_info in abstract_config.tokens.items():
|
|
token_collection_info = token_info.collections.get(collection_request.name)
|
|
if token_collection_info:
|
|
token_permissions = get_token_permissions(token_collection_info.mode)
|
|
if token_permissions.incoming_write or token_permissions.zones_access:
|
|
if not collection_request.incoming:
|
|
detail = (
|
|
f"Cannot add collection '{collection_request.name}' without "
|
|
f"`incoming` path, because at least token '{token_name}' "
|
|
f" has write access to the collection"
|
|
)
|
|
raise HTTPException(
|
|
status_code=HTTP_406_NOT_ACCEPTABLE,
|
|
detail=detail,
|
|
)
|