dump-things-server/dump_things_service/token_endpoints.py
Christian Monch 0780c57f4c persist configuration after token deletion
This commit fixes a bug, where a configuration was
not persisted after a token was deleted.
2026-06-17 10:27:51 +02:00

351 lines
10 KiB
Python

import logging
import random
import re
from urllib.parse import quote
from fastapi import (
APIRouter,
Depends,
HTTPException,
Response,
)
from starlette.status import HTTP_406_NOT_ACCEPTABLE
from dump_things_service import (
HTTP_201_CREATED,
HTTP_404_NOT_FOUND,
HTTP_409_CONFLICT,
)
from dump_things_service.abstract_config import (
AdminTokenConfig,
StrictModel,
TokenCollectionConfig,
TokenConfig,
get_config,
get_token_info_by_representation,
get_token_permissions,
hash_token_representation,
read_config,
store_config,
)
from dump_things_service.admin import authenticate_admin
from dump_things_service.api_key import api_key_header_scheme
from dump_things_service.instance_state import get_instance_state
from dump_things_service.exceptions import ConfigError
from dump_things_service.manifest import manifest_configuration
from dump_things_service.utils import wrap_http_exception
logger = logging.getLogger('dump_things_service')
router = APIRouter()
hash_matcher = re.compile(r'^[a-f0-9A-F]{64}$')
class TokenRequest(TokenConfig):
name: str
class TokenResponse(StrictModel):
name: str
user_id: str
collections: dict[str, TokenCollectionConfig]
class AdminTokenRequest(AdminTokenConfig):
name: str
def get_token_parts(token: str) -> list[str]:
parts = token.split('-', 1)
if len(parts) != 2:
msg = 'Invalid token format'
raise ValueError(msg)
return parts
@router.post(
'/tokens',
tags=['Administration interface'],
name='Create a new token',
status_code=HTTP_201_CREATED,
)
async def create_token(
response: Response,
body: TokenRequest,
api_key: str = Depends(api_key_header_scheme),
) -> TokenRequest:
instance_state = get_instance_state()
abstract_config = read_config(store_path=instance_state.store_path)
authenticate_admin(instance_state, abstract_config, api_key)
# Check for existing token-name
if body.name in abstract_config.tokens:
raise HTTPException(
status_code=HTTP_409_CONFLICT,
detail=f"Token with name '{body.name}' already exists.",
)
# Ensure that all specified collections and modes exist
for collection_name, token_collection_info in body.collections.items():
if collection_name not in abstract_config.collections:
detail = f"No such collection: '{collection_name}'."
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=detail)
# Check that incoming areas are defined if the token allows writing.
token_permissions = get_token_permissions(token_collection_info.mode)
if token_permissions.incoming_write or token_permissions.zones_access:
# Check for incoming definition in collection config
collection_info = abstract_config.collections[collection_name]
if not collection_info.incoming:
detail = (
f"Cannot add token with write access to collection "
f"'{collection_name}' without `incoming`."
)
raise HTTPException(
status_code=HTTP_406_NOT_ACCEPTABLE,
detail=detail,
)
# Check for incoming label in token definition for the collection
if not token_collection_info.incoming_label:
detail = f"Incoming label missing for collection '{collection_name}'"
raise HTTPException(
status_code=HTTP_406_NOT_ACCEPTABLE,
detail=detail,
)
if body.representation:
# We have a specific representation, check that it is not already used
existing_token_info = get_token_info_by_representation(
abstract_config=abstract_config,
token_representation=body.representation,
)
if existing_token_info:
detail= f"Token with identical representation already exists."
raise HTTPException(status_code=HTTP_409_CONFLICT, detail=detail)
else:
# Generate a random representation that does not yet exist.
collision = True
while collision:
body.representation = random.randbytes(24).hex()
existing_token_info = get_token_info_by_representation(
abstract_config=abstract_config,
token_representation=body.representation,
)
collision = existing_token_info is not None
# Store the new token in the configuration
abstract_config.tokens[body.name] = TokenConfig(
user_id=body.user_id,
collections=body.collections,
representation=(
hash_token_representation(body.representation)
if body.hashed
else body.representation
),
)
# Manifest the new configuration
with wrap_http_exception(ConfigError):
manifest_configuration(abstract_config, instance_state)
# Persist the configuration
store_config(
store_path=instance_state.store_path,
config=abstract_config,
)
response.headers['Location'] = f'/tokens/{quote(body.name)}'
return TokenRequest(
name=body.name,
user_id=body.user_id,
collections=body.collections,
representation=body.representation,
hashed=body.hashed,
)
@router.get(
'/tokens',
tags=['Administration interface'],
name='Get existing tokens',
)
async def get_tokens(
api_key: str = Depends(api_key_header_scheme),
) -> list[TokenResponse]:
instance_state = get_instance_state()
abstract_config = read_config(store_path=instance_state.store_path)
authenticate_admin(instance_state, abstract_config, api_key)
return [
TokenResponse(
name=n,
user_id=t.user_id,
collections=t.collections,
)
for n, t in abstract_config.tokens.items()
]
@router.get(
'/tokens/{token_name}',
tags=['Administration interface'],
name='Get token by name',
)
async def get_token_with_name(
token_name: str,
api_key: str = Depends(api_key_header_scheme),
) -> TokenResponse:
instance_state = get_instance_state()
abstract_config = get_config()
authenticate_admin(instance_state, abstract_config, api_key)
abstract_config = read_config(store_path=instance_state.store_path)
if token_name not in abstract_config.tokens:
detail = f"token with name '{token_name}' does not exist."
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=detail)
t = abstract_config.tokens[token_name]
return TokenResponse(
name=token_name,
user_id=t.user_id,
collections=t.collections,
)
@router.delete(
'/tokens/{token_name}',
tags=['Administration interface'],
name='Delete token with name',
)
async def delete_token_with_name(
token_name: str,
api_key: str = Depends(api_key_header_scheme),
):
instance_state = get_instance_state()
abstract_config = get_config()
authenticate_admin(instance_state, abstract_config, api_key)
abstract_config = read_config(store_path=instance_state.store_path)
if token_name not in abstract_config.tokens:
detail = f"token with name '{token_name}' does not exist."
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=detail)
# Store the new token in the configuration
del abstract_config.tokens[token_name]
# Manifest the new configuration
with wrap_http_exception(ConfigError):
manifest_configuration(abstract_config, instance_state)
# Persist the configuration.
store_config(
store_path=instance_state.store_path,
config=abstract_config,
)
@router.post(
'/admin_tokens',
tags=['Administration interface'],
name='Add a new admin token',
status_code=HTTP_201_CREATED,
)
async def create_admin_token(
body: AdminTokenRequest,
api_key: str = Depends(api_key_header_scheme),
):
instance_state = get_instance_state()
abstract_config = read_config(store_path=instance_state.store_path)
authenticate_admin(instance_state, abstract_config, api_key)
# Check for token content
if not body.representation:
detail='Empty administrator token is not allowed'
raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail=detail)
if not hash_matcher.match(body.representation.strip()):
detail='Hashed token is not a 64-digits hex-number'
raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail=detail)
# Check for existing token-name
if body.name in abstract_config.admin_tokens:
raise HTTPException(
status_code=HTTP_409_CONFLICT,
detail=f"Admin token with name '{body.name}' already exists.",
)
# It is sufficient to add the new admin token to the admin_token dictionary
# in order to manifest the new configuration.
abstract_config.admin_tokens[body.name] = AdminTokenConfig(
representation=body.representation,
)
# Persist the configuration.
store_config(
store_path=instance_state.store_path,
config=abstract_config,
)
@router.get(
'/admin_tokens',
tags=['Administration interface'],
name='Get admin token names',
)
async def get_admin_token(
api_key: str = Depends(api_key_header_scheme),
) -> list[str]:
instance_state = get_instance_state()
abstract_config = read_config(store_path=instance_state.store_path)
authenticate_admin(instance_state, abstract_config, api_key)
return list(abstract_config.admin_tokens) + (
[]
if instance_state.bootstrap_token is None
else ['__bootstrap__']
)
@router.delete(
'/admin_tokens/{token_name}',
tags=['Administration interface'],
name='Delete admin token with name',
)
async def delete_admin_token(
token_name: str,
api_key: str = Depends(api_key_header_scheme),
):
instance_state = get_instance_state()
abstract_config = read_config(store_path=instance_state.store_path)
authenticate_admin(instance_state, abstract_config, api_key)
# Check for token existence
if token_name not in abstract_config.admin_tokens:
raise HTTPException(
status_code=HTTP_404_NOT_FOUND,
detail=f"Admin token with name '{token_name}' does not exist.",
)
del abstract_config.admin_tokens[token_name]
# Persist the configuration.
store_config(
store_path=instance_state.store_path,
config=abstract_config,
)