dump-things-server/dump_things_service/commands/load_config.py
Christian Monch 52119f3b60
All checks were successful
Test execution / Test-all (push) Successful in 2m14s
fix backend handling in config format conversion
2026-05-26 17:22:44 +02:00

236 lines
7.3 KiB
Python

from __future__ import annotations
import os
import sys
from argparse import ArgumentParser
from itertools import count
from pathlib import Path
import requests
import yaml
from dump_things_service.instance_state import get_record_dir_config
parser = ArgumentParser(
prog='Establish a configuration in a running service',
description='Read a configuration from a dump-things configuration-file '
'and instantiate its elements on a running server. Objects that '
'already exist on the server are left unchanged. '
' '
'An admin token has to be provided in the environment variable '
'`DTS_ADMIN_TOKEN`.',
)
parser.add_argument(
'config_file',
help='The path to the config file',
)
parser.add_argument(
'--send-to',
help='The base URL of the server API',
)
parser.add_argument(
'--old-format',
action='store_true',
help='If provided, assume that the configuration is in the old format '
'and convert it to the new format internally (in old format: tokens '
'had no `hashed`-attribute and no `representation`-attribute, the token '
'representation was the key of the token configuration, '
'collections had no `schema`-attribute, and `sqlite`-backends had '
'a `schema`-attribute).',
)
parser.add_argument(
'--store',
default=None,
help='If --old-format is provided, this option can be used to specify a '
'store directory. The store directory will be used to load `RecordDir` '
'configurations, if a collection defines are `RecordDir`-backend. '
'(This option has no effect if no collection in the old configuration '
'uses a `RecordDir`-backend.)',
)
def main():
arguments = parser.parse_args()
with open(arguments.config_file) as config_file:
configuration = yaml.safe_load(config_file)
assert configuration['type'] == 'collections', '`type`-entry missing in old config-file'
if arguments.old_format:
configuration = convert_to_new_format(configuration, arguments.store)
else:
if arguments.store:
print(
'Warning: ignoring `--store` option because `--old-format` '
'is not provided.',
file=sys.stderr,
flush=True,
)
assert configuration['version'] == 2, '`version: 2` missing in config-file'
if arguments.send_to:
admin_token = os.environ.get('DTS_ADMIN_TOKEN')
if not admin_token:
print(
'An admin token not provided in the environment variable `DTS_ADMIN_TOKEN`',
file=sys.stderr,
flush=True,
)
return 1
try:
establish_configuration(
configuration,
arguments.send_to[:-1]
if arguments.send_to.endswith('/')
else arguments.send_to,
admin_token,
)
return 0
except RuntimeError as rte:
print(f'{rte.args[0]}', file=sys.stderr, flush=True)
return 2
print(
yaml.dump(
data=configuration,
sort_keys=False,
allow_unicode=True,
default_flow_style=False,
)
)
return 0
def convert_to_new_format(
old_configuration: dict,
store_path: str | Path,
) -> dict:
assert old_configuration['version'] == 1, '`version: 1` missing in old config-file'
counter = count(1)
new_tokens_dict = {
f'token_{next(counter)}': {
**old_token_config.copy(),
'representation': token_representation,
'hashed': False
}
for token_representation, old_token_config in old_configuration['tokens'].items()
}
old_to_new_token_mapping = {
token_config['representation']: token_name
for token_name, token_config in new_tokens_dict.items()
}
store_path = Path(store_path)
for collection_name, collection_config in old_configuration['collections'].items():
backend = collection_config.get('backend')
if backend and backend['type'].startswith('sqlite'):
collection_config['schema'] = backend['schema']
del backend['schema']
elif not backend or backend['type'].startswith('record_dir'):
if store_path is None:
msg = '--store <path> has to be provided to convert collection with record_dir-backends'
raise ValueError(msg)
record_dir_config = get_record_dir_config(store_path / collection_config['curated'])
collection_config['schema'] = record_dir_config.schema
backend = {
'type': 'record_dir+stl' if not backend else backend['type'],
'mapping_method': record_dir_config.idfx.value
}
collection_config['backend'] = backend
collection_config['default_token'] = old_to_new_token_mapping[collection_config['default_token']]
new_configuration = {
'type': old_configuration['type'],
'version': 2,
'tokens': new_tokens_dict,
'collections': old_configuration['collections'],
'admin_tokens': {},
}
return new_configuration
def establish_configuration(
configuration: dict,
api_url: str,
admin_token: str,
):
create_collections(configuration, api_url, admin_token)
create_tokens(configuration, api_url, admin_token)
create_admin_tokens(configuration, api_url, admin_token)
def create_tokens(
configuration: dict,
api_url: str,
admin_token: str,
):
for token_name, token_config in configuration['tokens'].items():
_post_data(
url=api_url + '/tokens',
data={
**token_config,
'name': token_name,
},
token=admin_token,
content_class='token',
content_name=token_name,
)
def create_collections(
configuration: dict,
api_url: str,
admin_token: str,
):
for collection_name, collection_config in configuration['collections'].items():
_post_data(
url=api_url + '/collections',
data={
**collection_config,
'name': collection_name,
},
token=admin_token,
content_class='collection',
content_name=collection_name,
)
def create_admin_tokens(
configuration: dict,
api_url: str,
admin_token: str,
):
for admin_token_name, admin_token_config in configuration['admin_tokens'].items():
_post_data(
url=api_url + '/admin_tokens',
data={
**admin_token_config,
'name': admin_token_name,
},
token=admin_token,
content_class='admin token',
content_name=admin_token_name,
)
def _post_data(
url: str,
data: dict,
token: str,
content_class: str,
content_name: str,
):
result = requests.post(url, headers={'x-dumpthings-token': token}, json=data,)
if result.status_code >= 300:
msg = f'Error uploading {content_class}: {content_name}: {result.text}'
raise RuntimeError(msg)
if __name__ == '__main__':
sys.exit(main())