dump-things-server/dump_things_service/collection_endpoints.py
Christian Monch 0225d8647b
Some checks failed
Test execution / Test-all (push) Failing after 1m34s
[temp] adjust tests
2026-05-06 17:22:44 +02:00

169 lines
4.6 KiB
Python

import logging
import random
import sys
from pathlib import (
Path,
PurePosixPath,
)
from typing import (
Literal,
cast,
)
from urllib.parse import quote
from fastapi import (
APIRouter,
Depends,
HTTPException,
Response,
)
from pydantic import (
BaseModel,
ConfigDict,
)
from dump_things_service import (
HTTP_201_CREATED,
HTTP_409_CONFLICT,
)
from dump_things_service.abstract_config import (
read_config,
store_config,
CollectionConfig,
Configuration,
)
from dump_things_service.admin import authenticate_admin
from dump_things_service.api_key import api_key_header_scheme
from dump_things_service.instance_state import get_instance_state
from dump_things_service.manifest import manifest_configuration
from dump_things_service.exceptions import ConfigError
from dump_things_service.utils import wrap_http_exception
logger = logging.getLogger('dump_things_service')
router = APIRouter()
class ConfigAuthSpec(BaseModel):
type: Literal['config'] = 'config'
class ForgejoAuthSpec(BaseModel):
type: Literal['forgejo']
url: str
organization: str
team: str
label_type: Literal['team', 'user']
repository: str | None = None
class TagSpec(BaseModel):
submitter_id_tag: str = 'http://purl.obolibrary.org/obo/NCIT_C54269'
submission_time_tag: str = 'http://semanticscience.org/resource/SIO_001083'
@router.post(
'/collections',
tags=['Administration interface'],
name='Create a new collection',
status_code=HTTP_201_CREATED,
)
async def create_collection(
response: Response,
body: CollectionConfig,
api_key: str = Depends(api_key_header_scheme),
):
instance_state = get_instance_state()
# Check admin rights
authenticate_admin(instance_state, api_key)
# TODO: read the current abstract configuration, check for a collection
# of the given name. If it does not exist yet, add a collection
# configuration that reflects the `body`. Then try to manifest the
# new configuration. If there are no errors, persist the new
# configuration.
configuration: Configuration = read_config(
store_path=instance_state.store_path
)
# Check for existing collection name
if body.name in configuration.collections:
raise HTTPException(
status_code=HTTP_409_CONFLICT,
detail=f"Collection with name '{body.name}' already exists.",
)
# Update the abstract configuration
configuration.collections[body.name] = body
# Manifest the abstract configuration
with wrap_http_exception(ConfigError):
manifest_configuration(configuration, instance_state)
# Persist the configuration
store_config(
store_path=instance_state.store_path,
config=configuration,
)
response.headers['Location'] = f'/collections/{quote(body.name)}'
@router.get(
'/collections',
tags=['Administration interface'],
name='Get existing collections',
)
async def get_collections(
api_key: str = Depends(api_key_header_scheme),
) -> list[CollectionConfig]:
instance_config = get_config()
# Check admin rights
authenticate_admin(instance_config, api_key)
abstract_config = read_config(store_path=instance_config.store_path)
return list(abstract_config.collections.values())
x = """
def create_or_reuse(
instance_config: InstanceConfig,
local_path: PurePosixPath,
schema_location: str,
backend_spec: str,
):
full_path = Path(instance_config.store_path / local_path)
if full_path.exists():
ensure_backend_type(full_path, schema_location, backend_spec)
else:
full_path.mkdir(parents=True, exist_ok=False)
create_backend(instance_config, full_path, schema_location, backend_spec)
def ensure_backend_type(
path: Path,
schema_location: str,
backend_spec: str,
):
backend_name, extension = get_backend_and_extension(backend_spec)
if backend_name is 'record_dir':
try:
config = Config.get_record_dir_config(path)
except ConfigError as e:
raise HTTPException(
status_code=HTTP_409_CONFLICT,
detail=f"did not find record_dir store in '{path}', reason: {e}",
) from e
if config.schema != schema_location:
raise HTTPException(
status_code=HTTP_409_CONFLICT,
detail=f"existing record_dir store has different schema: '{config.schema}'",
)
elif backend_name is 'sqlite':
pass
else:
raise ValueError(f"unknown backend type: '{backend_spec}'")
"""