183 lines
5.4 KiB
Python
183 lines
5.4 KiB
Python
import hashlib
|
|
import logging
|
|
import random
|
|
import sys
|
|
from urllib.parse import quote
|
|
|
|
from fastapi import (
|
|
APIRouter,
|
|
Depends,
|
|
HTTPException,
|
|
Response,
|
|
)
|
|
|
|
from dump_things_service import (
|
|
HTTP_201_CREATED,
|
|
HTTP_404_NOT_FOUND,
|
|
HTTP_409_CONFLICT,
|
|
)
|
|
from dump_things_service.abstract_config import (
|
|
TokenConfig,
|
|
read_config,
|
|
store_config,
|
|
)
|
|
from dump_things_service.admin import authenticate_admin
|
|
from dump_things_service.api_key import api_key_header_scheme
|
|
from dump_things_service.instance_state import get_instance_state
|
|
from dump_things_service.exceptions import ConfigError
|
|
from dump_things_service.manifest import manifest_configuration
|
|
from dump_things_service.utils import wrap_http_exception
|
|
|
|
|
|
logger = logging.getLogger('dump_things_service')
|
|
router = APIRouter()
|
|
|
|
|
|
class TokenRequest(TokenConfig):
|
|
name: str
|
|
|
|
|
|
def get_token_parts(token: str) -> list[str]:
|
|
parts = token.split('-', 1)
|
|
if len(parts) != 2:
|
|
msg = 'Invalid token format'
|
|
raise ValueError(msg)
|
|
return parts
|
|
|
|
|
|
def hash_token(token: str) -> str:
|
|
parts = get_token_parts(token)
|
|
hasher = hashlib.sha256()
|
|
hasher.update(parts[1].encode())
|
|
return f'{parts[0]}-{hasher.hexdigest()}'
|
|
|
|
|
|
@router.post(
|
|
'/tokens',
|
|
tags=['Administration interface'],
|
|
name='Create a new token',
|
|
status_code=HTTP_201_CREATED,
|
|
)
|
|
async def create_token(
|
|
response: Response,
|
|
body: TokenRequest,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
) -> TokenRequest:
|
|
|
|
instance_state = get_instance_state()
|
|
authenticate_admin(instance_state, api_key)
|
|
|
|
abstract_config = read_config(store_path=instance_state.store_path)
|
|
# Check for existing token-name
|
|
if body.name in abstract_config.tokens:
|
|
raise HTTPException(
|
|
status_code=HTTP_409_CONFLICT,
|
|
detail=f"Token with name '{body.name}' already exists.",
|
|
)
|
|
|
|
# Ensure that all specified collections and modes exist
|
|
for collection_name, token_collection_info in body.collections.items():
|
|
if collection_name not in abstract_config.collections:
|
|
detail = f"No such collection: '{collection_name}'."
|
|
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=detail)
|
|
|
|
print(f'IMPLEMENT: check incoming label ({token_collection_info.incoming_label}), check mode ({token_collection_info.mode})', file=sys.stderr, flush=True)
|
|
# TODO: check mode(!), check incoming_label(?)
|
|
|
|
if body.representation:
|
|
# We have a specific representation, check that it is not already used
|
|
for token in abstract_config.tokens.values():
|
|
if token.representation == body.representation:
|
|
detail= f"Representation '{body.representation}' already exists."
|
|
raise HTTPException(status_code=HTTP_409_CONFLICT, detail=detail)
|
|
else:
|
|
# Generate a random representation that does not yet exist.
|
|
collision = True
|
|
while collision:
|
|
body.representation = random.randbytes(24).hex()
|
|
collision = any(
|
|
map(
|
|
lambda t: t.representation == body.representation,
|
|
instance_state.xxx_tokens
|
|
)
|
|
)
|
|
|
|
# Store the new token in the configuration
|
|
abstract_config.tokens[body.name] = TokenConfig(
|
|
user_id=body.user_id,
|
|
collections=body.collections,
|
|
representation=body.representation,
|
|
hashed=body.hashed,
|
|
)
|
|
|
|
# Manifest the configuration
|
|
with wrap_http_exception(ConfigError):
|
|
manifest_configuration(abstract_config, instance_state)
|
|
|
|
# Persist the configuration
|
|
store_config(
|
|
store_path=instance_state.store_path,
|
|
config=abstract_config,
|
|
)
|
|
|
|
response.headers['Location'] = f'/tokens/{quote(body.name)}'
|
|
return TokenRequest(
|
|
name=body.name,
|
|
user_id=body.user_id,
|
|
collections=body.collections,
|
|
representation=body.representation,
|
|
hashed=body.hashed,
|
|
)
|
|
|
|
|
|
@router.get(
|
|
'/tokens',
|
|
tags=['Administration interface'],
|
|
name='Get existing tokens',
|
|
)
|
|
async def get_tokens(
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
) -> list[TokenRequest]:
|
|
|
|
instance_config = get_config()
|
|
authenticate_admin(instance_config, api_key)
|
|
|
|
abstract_config = read_config(store_path=instance_config.store_path)
|
|
return [
|
|
TokenRequest(
|
|
name=n,
|
|
user_id=t.user_id,
|
|
representation=t.representation,
|
|
collections=t.collections,
|
|
hashed=t.hashed,
|
|
)
|
|
for n, t in abstract_config.tokens.items()
|
|
]
|
|
|
|
|
|
@router.get(
|
|
'/tokens/{token_name}',
|
|
tags=['Administration interface'],
|
|
name='Get token by name',
|
|
)
|
|
async def get_token_with_name(
|
|
token_name: str,
|
|
api_key: str = Depends(api_key_header_scheme),
|
|
) -> TokenRequest:
|
|
|
|
instance_state = get_instance_state()
|
|
authenticate_admin(instance_state, api_key)
|
|
|
|
abstract_config = read_config(store_path=instance_state.store_path)
|
|
if token_name not in abstract_config.tokens:
|
|
detail = f"token with name '{token_name}' does not exist."
|
|
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=detail)
|
|
|
|
t = abstract_config.tokens[token_name]
|
|
return TokenRequest(
|
|
name=token_name,
|
|
user_id=t.user_id,
|
|
representation=t.representation,
|
|
collections=t.collections,
|
|
hashed=t.hashed,
|
|
)
|