236 lines
7.3 KiB
Python
236 lines
7.3 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
from argparse import ArgumentParser
|
|
from itertools import count
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
import yaml
|
|
|
|
from dump_things_service.instance_state import get_record_dir_config
|
|
|
|
|
|
parser = ArgumentParser(
|
|
prog='Establish a configuration in a running service',
|
|
description='Read a configuration from a dump-things configuration-file '
|
|
'and instantiate its elements on a running server. Objects that '
|
|
'already exist on the server are left unchanged. '
|
|
' '
|
|
'An admin token has to be provided in the environment variable '
|
|
'`DTS_ADMIN_TOKEN`.',
|
|
)
|
|
parser.add_argument(
|
|
'config_file',
|
|
help='The path to the config file',
|
|
)
|
|
parser.add_argument(
|
|
'--send-to',
|
|
help='The base URL of the server API',
|
|
)
|
|
parser.add_argument(
|
|
'--old-format',
|
|
action='store_true',
|
|
help='If provided, assume that the configuration is in the old format '
|
|
'and convert it to the new format internally (in old format: tokens '
|
|
'had no `hashed`-attribute and no `representation`-attribute, the token '
|
|
'representation was the key of the token configuration, '
|
|
'collections had no `schema`-attribute, and `sqlite`-backends had '
|
|
'a `schema`-attribute).',
|
|
)
|
|
parser.add_argument(
|
|
'--store',
|
|
default=None,
|
|
help='If --old-format is provided, this option can be used to specify a '
|
|
'store directory. The store directory will be used to load `RecordDir` '
|
|
'configurations, if a collection defines are `RecordDir`-backend. '
|
|
'(This option has no effect if no collection in the old configuration '
|
|
'uses a `RecordDir`-backend.)',
|
|
)
|
|
|
|
|
|
def main():
|
|
arguments = parser.parse_args()
|
|
|
|
with open(arguments.config_file) as config_file:
|
|
configuration = yaml.safe_load(config_file)
|
|
|
|
assert configuration['type'] == 'collections', '`type`-entry missing in old config-file'
|
|
if arguments.old_format:
|
|
configuration = convert_to_new_format(configuration, arguments.store)
|
|
else:
|
|
if arguments.store:
|
|
print(
|
|
'Warning: ignoring `--store` option because `--old-format` '
|
|
'is not provided.',
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
|
|
assert configuration['version'] == 2, '`version: 2` missing in config-file'
|
|
|
|
if arguments.send_to:
|
|
admin_token = os.environ.get('DTS_ADMIN_TOKEN')
|
|
if not admin_token:
|
|
print(
|
|
'An admin token not provided in the environment variable `DTS_ADMIN_TOKEN`',
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
return 1
|
|
|
|
try:
|
|
establish_configuration(
|
|
configuration,
|
|
arguments.send_to[:-1]
|
|
if arguments.send_to.endswith('/')
|
|
else arguments.send_to,
|
|
admin_token,
|
|
)
|
|
return 0
|
|
except RuntimeError as rte:
|
|
print(f'{rte.args[0]}', file=sys.stderr, flush=True)
|
|
return 2
|
|
|
|
print(
|
|
yaml.dump(
|
|
data=configuration,
|
|
sort_keys=False,
|
|
allow_unicode=True,
|
|
default_flow_style=False,
|
|
)
|
|
)
|
|
return 0
|
|
|
|
|
|
def convert_to_new_format(
|
|
old_configuration: dict,
|
|
store_path: str | Path,
|
|
) -> dict:
|
|
|
|
assert old_configuration['version'] == 1, '`version: 1` missing in old config-file'
|
|
|
|
counter = count(1)
|
|
new_tokens_dict = {
|
|
f'token_{next(counter)}': {
|
|
**old_token_config.copy(),
|
|
'representation': token_representation,
|
|
'hashed': False
|
|
}
|
|
for token_representation, old_token_config in old_configuration['tokens'].items()
|
|
}
|
|
|
|
old_to_new_token_mapping = {
|
|
token_config['representation']: token_name
|
|
for token_name, token_config in new_tokens_dict.items()
|
|
}
|
|
|
|
store_path = Path(store_path)
|
|
for collection_name, collection_config in old_configuration['collections'].items():
|
|
backend = collection_config.get('backend')
|
|
if backend and backend['type'].startswith('sqlite'):
|
|
collection_config['schema'] = backend['schema']
|
|
del backend['schema']
|
|
elif not backend or backend['type'].startswith('record_dir'):
|
|
if store_path is None:
|
|
msg = '--store <path> has to be provided to convert collection with record_dir-backends'
|
|
raise ValueError(msg)
|
|
record_dir_config = get_record_dir_config(store_path / collection_config['curated'])
|
|
collection_config['schema'] = record_dir_config.schema
|
|
backend = {
|
|
'type': 'record_dir+stl' if not backend else backend['type'],
|
|
'mapping_method': record_dir_config.idfx.value
|
|
}
|
|
collection_config['backend'] = backend
|
|
collection_config['default_token'] = old_to_new_token_mapping[collection_config['default_token']]
|
|
|
|
new_configuration = {
|
|
'type': old_configuration['type'],
|
|
'version': 2,
|
|
'tokens': new_tokens_dict,
|
|
'collections': old_configuration['collections'],
|
|
'admin_tokens': {},
|
|
}
|
|
return new_configuration
|
|
|
|
|
|
def establish_configuration(
|
|
configuration: dict,
|
|
api_url: str,
|
|
admin_token: str,
|
|
):
|
|
create_collections(configuration, api_url, admin_token)
|
|
create_tokens(configuration, api_url, admin_token)
|
|
create_admin_tokens(configuration, api_url, admin_token)
|
|
|
|
|
|
def create_tokens(
|
|
configuration: dict,
|
|
api_url: str,
|
|
admin_token: str,
|
|
):
|
|
for token_name, token_config in configuration['tokens'].items():
|
|
_post_data(
|
|
url=api_url + '/tokens',
|
|
data={
|
|
**token_config,
|
|
'name': token_name,
|
|
},
|
|
token=admin_token,
|
|
content_class='token',
|
|
content_name=token_name,
|
|
)
|
|
|
|
|
|
def create_collections(
|
|
configuration: dict,
|
|
api_url: str,
|
|
admin_token: str,
|
|
):
|
|
for collection_name, collection_config in configuration['collections'].items():
|
|
_post_data(
|
|
url=api_url + '/collections',
|
|
data={
|
|
**collection_config,
|
|
'name': collection_name,
|
|
},
|
|
token=admin_token,
|
|
content_class='collection',
|
|
content_name=collection_name,
|
|
)
|
|
|
|
|
|
def create_admin_tokens(
|
|
configuration: dict,
|
|
api_url: str,
|
|
admin_token: str,
|
|
):
|
|
for admin_token_name, admin_token_config in configuration['admin_tokens'].items():
|
|
_post_data(
|
|
url=api_url + '/admin_tokens',
|
|
data={
|
|
**admin_token_config,
|
|
'name': admin_token_name,
|
|
},
|
|
token=admin_token,
|
|
content_class='admin token',
|
|
content_name=admin_token_name,
|
|
)
|
|
|
|
|
|
def _post_data(
|
|
url: str,
|
|
data: dict,
|
|
token: str,
|
|
content_class: str,
|
|
content_name: str,
|
|
):
|
|
result = requests.post(url, headers={'x-dumpthings-token': token}, json=data,)
|
|
if result.status_code >= 300:
|
|
msg = f'Error uploading {content_class}: {content_name}: {result.text}'
|
|
raise RuntimeError(msg)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|