This commit fixes a bug, where a configuration was not persisted after a token was deleted.
351 lines
10 KiB
Python
351 lines
10 KiB
Python
import logging
|
|
import random
|
|
import re
|
|
from urllib.parse import quote
|
|
|
|
from fastapi import (
|
|
APIRouter,
|
|
Depends,
|
|
HTTPException,
|
|
Response,
|
|
)
|
|
from starlette.status import HTTP_406_NOT_ACCEPTABLE
|
|
|
|
from dump_things_service import (
|
|
HTTP_201_CREATED,
|
|
HTTP_404_NOT_FOUND,
|
|
HTTP_409_CONFLICT,
|
|
)
|
|
from dump_things_service.abstract_config import (
|
|
AdminTokenConfig,
|
|
StrictModel,
|
|
TokenCollectionConfig,
|
|
TokenConfig,
|
|
get_config,
|
|
get_token_info_by_representation,
|
|
get_token_permissions,
|
|
hash_token_representation,
|
|
read_config,
|
|
store_config,
|
|
)
|
|
from dump_things_service.admin import authenticate_admin
|
|
from dump_things_service.api_key import api_key_header_scheme
|
|
from dump_things_service.instance_state import get_instance_state
|
|
from dump_things_service.exceptions import ConfigError
|
|
from dump_things_service.manifest import manifest_configuration
|
|
from dump_things_service.utils import wrap_http_exception
|
|
|
|
|
|
logger = logging.getLogger('dump_things_service')
|
|
router = APIRouter()
|
|
|
|
hash_matcher = re.compile(r'^[a-f0-9A-F]{64}$')
|
|
|
|
|
|
class TokenRequest(TokenConfig):
|
|
name: str
|
|
|
|
|
|
class TokenResponse(StrictModel):
|
|
name: str
|
|
user_id: str
|
|
collections: dict[str, TokenCollectionConfig]
|
|
|
|
|
|
class AdminTokenRequest(AdminTokenConfig):
|
|
name: str
|
|
|
|
|
|
def get_token_parts(token: str) -> list[str]:
|
|
parts = token.split('-', 1)
|
|
if len(parts) != 2:
|
|
msg = 'Invalid token format'
|
|
raise ValueError(msg)
|
|
return parts
|
|
|
|
|
|
@router.post(
|
|
'/tokens',
|
|
tags=['Administration interface'],
|
|
name='Create a new token',
|
|
status_code=HTTP_201_CREATED,
|
|
)
|
|
async def create_token(
|
|
response: Response,
|
|
body: TokenRequest,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
) -> TokenRequest:
|
|
|
|
instance_state = get_instance_state()
|
|
abstract_config = read_config(store_path=instance_state.store_path)
|
|
|
|
authenticate_admin(instance_state, abstract_config, api_key)
|
|
|
|
# Check for existing token-name
|
|
if body.name in abstract_config.tokens:
|
|
raise HTTPException(
|
|
status_code=HTTP_409_CONFLICT,
|
|
detail=f"Token with name '{body.name}' already exists.",
|
|
)
|
|
|
|
# Ensure that all specified collections and modes exist
|
|
for collection_name, token_collection_info in body.collections.items():
|
|
if collection_name not in abstract_config.collections:
|
|
detail = f"No such collection: '{collection_name}'."
|
|
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=detail)
|
|
|
|
# Check that incoming areas are defined if the token allows writing.
|
|
token_permissions = get_token_permissions(token_collection_info.mode)
|
|
if token_permissions.incoming_write or token_permissions.zones_access:
|
|
|
|
# Check for incoming definition in collection config
|
|
collection_info = abstract_config.collections[collection_name]
|
|
if not collection_info.incoming:
|
|
detail = (
|
|
f"Cannot add token with write access to collection "
|
|
f"'{collection_name}' without `incoming`."
|
|
)
|
|
raise HTTPException(
|
|
status_code=HTTP_406_NOT_ACCEPTABLE,
|
|
detail=detail,
|
|
)
|
|
|
|
# Check for incoming label in token definition for the collection
|
|
if not token_collection_info.incoming_label:
|
|
detail = f"Incoming label missing for collection '{collection_name}'"
|
|
raise HTTPException(
|
|
status_code=HTTP_406_NOT_ACCEPTABLE,
|
|
detail=detail,
|
|
)
|
|
|
|
if body.representation:
|
|
# We have a specific representation, check that it is not already used
|
|
existing_token_info = get_token_info_by_representation(
|
|
abstract_config=abstract_config,
|
|
token_representation=body.representation,
|
|
)
|
|
if existing_token_info:
|
|
detail= f"Token with identical representation already exists."
|
|
raise HTTPException(status_code=HTTP_409_CONFLICT, detail=detail)
|
|
else:
|
|
# Generate a random representation that does not yet exist.
|
|
collision = True
|
|
while collision:
|
|
body.representation = random.randbytes(24).hex()
|
|
existing_token_info = get_token_info_by_representation(
|
|
abstract_config=abstract_config,
|
|
token_representation=body.representation,
|
|
)
|
|
collision = existing_token_info is not None
|
|
|
|
# Store the new token in the configuration
|
|
abstract_config.tokens[body.name] = TokenConfig(
|
|
user_id=body.user_id,
|
|
collections=body.collections,
|
|
representation=(
|
|
hash_token_representation(body.representation)
|
|
if body.hashed
|
|
else body.representation
|
|
),
|
|
)
|
|
|
|
# Manifest the new configuration
|
|
with wrap_http_exception(ConfigError):
|
|
manifest_configuration(abstract_config, instance_state)
|
|
|
|
# Persist the configuration
|
|
store_config(
|
|
store_path=instance_state.store_path,
|
|
config=abstract_config,
|
|
)
|
|
|
|
response.headers['Location'] = f'/tokens/{quote(body.name)}'
|
|
return TokenRequest(
|
|
name=body.name,
|
|
user_id=body.user_id,
|
|
collections=body.collections,
|
|
representation=body.representation,
|
|
hashed=body.hashed,
|
|
)
|
|
|
|
|
|
@router.get(
|
|
'/tokens',
|
|
tags=['Administration interface'],
|
|
name='Get existing tokens',
|
|
)
|
|
async def get_tokens(
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
) -> list[TokenResponse]:
|
|
|
|
instance_state = get_instance_state()
|
|
abstract_config = read_config(store_path=instance_state.store_path)
|
|
|
|
authenticate_admin(instance_state, abstract_config, api_key)
|
|
|
|
return [
|
|
TokenResponse(
|
|
name=n,
|
|
user_id=t.user_id,
|
|
collections=t.collections,
|
|
)
|
|
for n, t in abstract_config.tokens.items()
|
|
]
|
|
|
|
|
|
@router.get(
|
|
'/tokens/{token_name}',
|
|
tags=['Administration interface'],
|
|
name='Get token by name',
|
|
)
|
|
async def get_token_with_name(
|
|
token_name: str,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
) -> TokenResponse:
|
|
|
|
instance_state = get_instance_state()
|
|
abstract_config = get_config()
|
|
|
|
authenticate_admin(instance_state, abstract_config, api_key)
|
|
|
|
abstract_config = read_config(store_path=instance_state.store_path)
|
|
if token_name not in abstract_config.tokens:
|
|
detail = f"token with name '{token_name}' does not exist."
|
|
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=detail)
|
|
|
|
t = abstract_config.tokens[token_name]
|
|
return TokenResponse(
|
|
name=token_name,
|
|
user_id=t.user_id,
|
|
collections=t.collections,
|
|
)
|
|
|
|
|
|
@router.delete(
|
|
'/tokens/{token_name}',
|
|
tags=['Administration interface'],
|
|
name='Delete token with name',
|
|
)
|
|
async def delete_token_with_name(
|
|
token_name: str,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
):
|
|
|
|
instance_state = get_instance_state()
|
|
abstract_config = get_config()
|
|
|
|
authenticate_admin(instance_state, abstract_config, api_key)
|
|
|
|
abstract_config = read_config(store_path=instance_state.store_path)
|
|
if token_name not in abstract_config.tokens:
|
|
detail = f"token with name '{token_name}' does not exist."
|
|
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=detail)
|
|
|
|
# Store the new token in the configuration
|
|
del abstract_config.tokens[token_name]
|
|
|
|
# Manifest the new configuration
|
|
with wrap_http_exception(ConfigError):
|
|
manifest_configuration(abstract_config, instance_state)
|
|
|
|
# Persist the configuration.
|
|
store_config(
|
|
store_path=instance_state.store_path,
|
|
config=abstract_config,
|
|
)
|
|
|
|
|
|
@router.post(
|
|
'/admin_tokens',
|
|
tags=['Administration interface'],
|
|
name='Add a new admin token',
|
|
status_code=HTTP_201_CREATED,
|
|
)
|
|
async def create_admin_token(
|
|
body: AdminTokenRequest,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
):
|
|
|
|
instance_state = get_instance_state()
|
|
abstract_config = read_config(store_path=instance_state.store_path)
|
|
|
|
authenticate_admin(instance_state, abstract_config, api_key)
|
|
|
|
# Check for token content
|
|
if not body.representation:
|
|
detail='Empty administrator token is not allowed'
|
|
raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail=detail)
|
|
|
|
if not hash_matcher.match(body.representation.strip()):
|
|
detail='Hashed token is not a 64-digits hex-number'
|
|
raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail=detail)
|
|
|
|
# Check for existing token-name
|
|
if body.name in abstract_config.admin_tokens:
|
|
raise HTTPException(
|
|
status_code=HTTP_409_CONFLICT,
|
|
detail=f"Admin token with name '{body.name}' already exists.",
|
|
)
|
|
|
|
# It is sufficient to add the new admin token to the admin_token dictionary
|
|
# in order to manifest the new configuration.
|
|
abstract_config.admin_tokens[body.name] = AdminTokenConfig(
|
|
representation=body.representation,
|
|
)
|
|
|
|
# Persist the configuration.
|
|
store_config(
|
|
store_path=instance_state.store_path,
|
|
config=abstract_config,
|
|
)
|
|
|
|
|
|
@router.get(
|
|
'/admin_tokens',
|
|
tags=['Administration interface'],
|
|
name='Get admin token names',
|
|
)
|
|
async def get_admin_token(
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
) -> list[str]:
|
|
instance_state = get_instance_state()
|
|
abstract_config = read_config(store_path=instance_state.store_path)
|
|
|
|
authenticate_admin(instance_state, abstract_config, api_key)
|
|
|
|
return list(abstract_config.admin_tokens) + (
|
|
[]
|
|
if instance_state.bootstrap_token is None
|
|
else ['__bootstrap__']
|
|
)
|
|
|
|
|
|
@router.delete(
|
|
'/admin_tokens/{token_name}',
|
|
tags=['Administration interface'],
|
|
name='Delete admin token with name',
|
|
)
|
|
async def delete_admin_token(
|
|
token_name: str,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
):
|
|
|
|
instance_state = get_instance_state()
|
|
abstract_config = read_config(store_path=instance_state.store_path)
|
|
|
|
authenticate_admin(instance_state, abstract_config, api_key)
|
|
|
|
# Check for token existence
|
|
if token_name not in abstract_config.admin_tokens:
|
|
raise HTTPException(
|
|
status_code=HTTP_404_NOT_FOUND,
|
|
detail=f"Admin token with name '{token_name}' does not exist.",
|
|
)
|
|
|
|
del abstract_config.admin_tokens[token_name]
|
|
|
|
# Persist the configuration.
|
|
store_config(
|
|
store_path=instance_state.store_path,
|
|
config=abstract_config,
|
|
)
|