169 lines
4.6 KiB
Python
169 lines
4.6 KiB
Python
import logging
|
|
import random
|
|
import sys
|
|
from pathlib import (
|
|
Path,
|
|
PurePosixPath,
|
|
)
|
|
from typing import (
|
|
Literal,
|
|
cast,
|
|
)
|
|
from urllib.parse import quote
|
|
|
|
from fastapi import (
|
|
APIRouter,
|
|
Depends,
|
|
HTTPException,
|
|
Response,
|
|
)
|
|
from pydantic import (
|
|
BaseModel,
|
|
ConfigDict,
|
|
)
|
|
|
|
from dump_things_service import (
|
|
HTTP_201_CREATED,
|
|
HTTP_409_CONFLICT,
|
|
)
|
|
from dump_things_service.abstract_config import (
|
|
read_config,
|
|
store_config,
|
|
CollectionConfig,
|
|
Configuration,
|
|
)
|
|
from dump_things_service.admin import authenticate_admin
|
|
from dump_things_service.api_key import api_key_header_scheme
|
|
from dump_things_service.instance_state import get_instance_state
|
|
from dump_things_service.manifest import manifest_configuration
|
|
from dump_things_service.exceptions import ConfigError
|
|
from dump_things_service.utils import wrap_http_exception
|
|
|
|
logger = logging.getLogger('dump_things_service')
|
|
router = APIRouter()
|
|
|
|
|
|
class ConfigAuthSpec(BaseModel):
|
|
type: Literal['config'] = 'config'
|
|
|
|
|
|
class ForgejoAuthSpec(BaseModel):
|
|
type: Literal['forgejo']
|
|
url: str
|
|
organization: str
|
|
team: str
|
|
label_type: Literal['team', 'user']
|
|
repository: str | None = None
|
|
|
|
|
|
class TagSpec(BaseModel):
|
|
submitter_id_tag: str = 'http://purl.obolibrary.org/obo/NCIT_C54269'
|
|
submission_time_tag: str = 'http://semanticscience.org/resource/SIO_001083'
|
|
|
|
|
|
@router.post(
|
|
'/collections',
|
|
tags=['Administration interface'],
|
|
name='Create a new collection',
|
|
status_code=HTTP_201_CREATED,
|
|
)
|
|
async def create_collection(
|
|
response: Response,
|
|
body: CollectionConfig,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
):
|
|
|
|
instance_state = get_instance_state()
|
|
|
|
# Check admin rights
|
|
authenticate_admin(instance_state, api_key)
|
|
|
|
# TODO: read the current abstract configuration, check for a collection
|
|
# of the given name. If it does not exist yet, add a collection
|
|
# configuration that reflects the `body`. Then try to manifest the
|
|
# new configuration. If there are no errors, persist the new
|
|
# configuration.
|
|
configuration: Configuration = read_config(
|
|
store_path=instance_state.store_path
|
|
)
|
|
|
|
# Check for existing collection name
|
|
if body.name in configuration.collections:
|
|
raise HTTPException(
|
|
status_code=HTTP_409_CONFLICT,
|
|
detail=f"Collection with name '{body.name}' already exists.",
|
|
)
|
|
|
|
# Update the abstract configuration
|
|
configuration.collections[body.name] = body
|
|
|
|
# Manifest the abstract configuration
|
|
with wrap_http_exception(ConfigError):
|
|
manifest_configuration(configuration, instance_state)
|
|
|
|
# Persist the configuration
|
|
store_config(
|
|
store_path=instance_state.store_path,
|
|
config=configuration,
|
|
)
|
|
|
|
response.headers['Location'] = f'/collections/{quote(body.name)}'
|
|
|
|
|
|
@router.get(
|
|
'/collections',
|
|
tags=['Administration interface'],
|
|
name='Get existing collections',
|
|
)
|
|
async def get_collections(
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
) -> list[CollectionConfig]:
|
|
|
|
instance_config = get_config()
|
|
|
|
# Check admin rights
|
|
authenticate_admin(instance_config, api_key)
|
|
|
|
abstract_config = read_config(store_path=instance_config.store_path)
|
|
return list(abstract_config.collections.values())
|
|
|
|
|
|
x = """
|
|
def create_or_reuse(
|
|
instance_config: InstanceConfig,
|
|
local_path: PurePosixPath,
|
|
schema_location: str,
|
|
backend_spec: str,
|
|
):
|
|
full_path = Path(instance_config.store_path / local_path)
|
|
if full_path.exists():
|
|
ensure_backend_type(full_path, schema_location, backend_spec)
|
|
else:
|
|
full_path.mkdir(parents=True, exist_ok=False)
|
|
create_backend(instance_config, full_path, schema_location, backend_spec)
|
|
|
|
|
|
def ensure_backend_type(
|
|
path: Path,
|
|
schema_location: str,
|
|
backend_spec: str,
|
|
):
|
|
backend_name, extension = get_backend_and_extension(backend_spec)
|
|
if backend_name is 'record_dir':
|
|
try:
|
|
config = Config.get_record_dir_config(path)
|
|
except ConfigError as e:
|
|
raise HTTPException(
|
|
status_code=HTTP_409_CONFLICT,
|
|
detail=f"did not find record_dir store in '{path}', reason: {e}",
|
|
) from e
|
|
if config.schema != schema_location:
|
|
raise HTTPException(
|
|
status_code=HTTP_409_CONFLICT,
|
|
detail=f"existing record_dir store has different schema: '{config.schema}'",
|
|
)
|
|
elif backend_name is 'sqlite':
|
|
pass
|
|
else:
|
|
raise ValueError(f"unknown backend type: '{backend_spec}'")
|
|
"""
|