dump-things-server/dump_things_service/token_endpoints.py
Christian Monch 0225d8647b
Some checks failed
Test execution / Test-all (push) Failing after 1m34s
[temp] adjust tests
2026-05-06 17:22:44 +02:00

183 lines
5.4 KiB
Python

import hashlib
import logging
import random
import sys
from urllib.parse import quote
from fastapi import (
APIRouter,
Depends,
HTTPException,
Response,
)
from dump_things_service import (
HTTP_201_CREATED,
HTTP_404_NOT_FOUND,
HTTP_409_CONFLICT,
)
from dump_things_service.abstract_config import (
TokenConfig,
read_config,
store_config,
)
from dump_things_service.admin import authenticate_admin
from dump_things_service.api_key import api_key_header_scheme
from dump_things_service.instance_state import get_instance_state
from dump_things_service.exceptions import ConfigError
from dump_things_service.manifest import manifest_configuration
from dump_things_service.utils import wrap_http_exception
logger = logging.getLogger('dump_things_service')
router = APIRouter()
class TokenRequest(TokenConfig):
name: str
def get_token_parts(token: str) -> list[str]:
parts = token.split('-', 1)
if len(parts) != 2:
msg = 'Invalid token format'
raise ValueError(msg)
return parts
def hash_token(token: str) -> str:
parts = get_token_parts(token)
hasher = hashlib.sha256()
hasher.update(parts[1].encode())
return f'{parts[0]}-{hasher.hexdigest()}'
@router.post(
'/tokens',
tags=['Administration interface'],
name='Create a new token',
status_code=HTTP_201_CREATED,
)
async def create_token(
response: Response,
body: TokenRequest,
api_key: str = Depends(api_key_header_scheme),
) -> TokenRequest:
instance_state = get_instance_state()
authenticate_admin(instance_state, api_key)
abstract_config = read_config(store_path=instance_state.store_path)
# Check for existing token-name
if body.name in abstract_config.tokens:
raise HTTPException(
status_code=HTTP_409_CONFLICT,
detail=f"Token with name '{body.name}' already exists.",
)
# Ensure that all specified collections and modes exist
for collection_name, token_collection_info in body.collections.items():
if collection_name not in abstract_config.collections:
detail = f"No such collection: '{collection_name}'."
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=detail)
print(f'IMPLEMENT: check incoming label ({token_collection_info.incoming_label}), check mode ({token_collection_info.mode})', file=sys.stderr, flush=True)
# TODO: check mode(!), check incoming_label(?)
if body.representation:
# We have a specific representation, check that it is not already used
for token in abstract_config.tokens.values():
if token.representation == body.representation:
detail= f"Representation '{body.representation}' already exists."
raise HTTPException(status_code=HTTP_409_CONFLICT, detail=detail)
else:
# Generate a random representation that does not yet exist.
collision = True
while collision:
body.representation = random.randbytes(24).hex()
collision = any(
map(
lambda t: t.representation == body.representation,
instance_state.xxx_tokens
)
)
# Store the new token in the configuration
abstract_config.tokens[body.name] = TokenConfig(
user_id=body.user_id,
collections=body.collections,
representation=body.representation,
hashed=body.hashed,
)
# Manifest the configuration
with wrap_http_exception(ConfigError):
manifest_configuration(abstract_config, instance_state)
# Persist the configuration
store_config(
store_path=instance_state.store_path,
config=abstract_config,
)
response.headers['Location'] = f'/tokens/{quote(body.name)}'
return TokenRequest(
name=body.name,
user_id=body.user_id,
collections=body.collections,
representation=body.representation,
hashed=body.hashed,
)
@router.get(
'/tokens',
tags=['Administration interface'],
name='Get existing tokens',
)
async def get_tokens(
api_key: str = Depends(api_key_header_scheme),
) -> list[TokenRequest]:
instance_config = get_config()
authenticate_admin(instance_config, api_key)
abstract_config = read_config(store_path=instance_config.store_path)
return [
TokenRequest(
name=n,
user_id=t.user_id,
representation=t.representation,
collections=t.collections,
hashed=t.hashed,
)
for n, t in abstract_config.tokens.items()
]
@router.get(
'/tokens/{token_name}',
tags=['Administration interface'],
name='Get token by name',
)
async def get_token_with_name(
token_name: str,
api_key: str = Depends(api_key_header_scheme),
) -> TokenRequest:
instance_state = get_instance_state()
authenticate_admin(instance_state, api_key)
abstract_config = read_config(store_path=instance_state.store_path)
if token_name not in abstract_config.tokens:
detail = f"token with name '{token_name}' does not exist."
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=detail)
t = abstract_config.tokens[token_name]
return TokenRequest(
name=token_name,
user_id=t.user_id,
representation=t.representation,
collections=t.collections,
hashed=t.hashed,
)