Add tests #22
4 changed files with 340 additions and 125 deletions
40
.github/workflows/run-server-tests.yml
vendored
Normal file
40
.github/workflows/run-server-tests.yml
vendored
Normal file
|
|
@ -0,0 +1,40 @@
|
||||||
|
|
||||||
|
name: Server tests
|
||||||
|
|
||||||
|
on: [push]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build:
|
||||||
|
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
strategy:
|
||||||
|
matrix:
|
||||||
|
python-version: [3.7]
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v2
|
||||||
|
- name: Set up Python ${{ matrix.python-version }}
|
||||||
|
uses: actions/setup-python@v2
|
||||||
|
with:
|
||||||
|
python-version: ${{ matrix.python-version }}
|
||||||
|
- name: Install dependencies
|
||||||
|
run: |
|
||||||
|
python -m pip install --upgrade pip
|
||||||
|
python -m pip install datalad-installer
|
||||||
|
datalad-installer datalad git-annex
|
||||||
|
export PATH="$PATH:/usr/share/miniconda/bin"
|
||||||
|
echo which git-annex
|
||||||
|
which git-annex
|
||||||
|
echo which datalad
|
||||||
|
which datalad
|
||||||
|
if [ -f requirements-devel.txt ]; then pip install -r requirements-devel.txt; fi
|
||||||
|
- name: Lint with flake8
|
||||||
|
run: |
|
||||||
|
# stop the build if there are Python syntax errors or undefined names
|
||||||
|
flake8 . --count --ignore E501,E722,E402 --show-source --statistics
|
||||||
|
- name: Test with pytest
|
||||||
|
run: |
|
||||||
|
export PATH="$PATH:/usr/share/miniconda/bin"
|
||||||
|
git config --global user.email "github-test@example.com"
|
||||||
|
git config --global user.name "Github Testscript"
|
||||||
|
pytest
|
||||||
5
requirements-devel.txt
Normal file
5
requirements-devel.txt
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
webtest
|
||||||
|
pytest
|
||||||
|
flake8
|
||||||
|
jinja2
|
||||||
|
|
||||||
|
|
@ -1,15 +1,23 @@
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
import time
|
import time
|
||||||
import subprocess
|
import subprocess
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, List, Union
|
from traceback import format_exception
|
||||||
|
from typing import Dict, List, Tuple, Union
|
||||||
from urllib.parse import parse_qs
|
from urllib.parse import parse_qs
|
||||||
|
|
||||||
from jinja2 import Environment, PackageLoader, select_autoescape
|
from jinja2 import Environment, select_autoescape
|
||||||
|
|
||||||
|
|
||||||
# Those fields are required in the user input. They can either
|
DATASET_ROOT_KEY = "de.inm7.sfb1451.entry.dataset_root"
|
||||||
|
HOME_KEY = "de.inm7.sfb1451.entry.home"
|
||||||
|
TEMPLATE_DIRECTORY_KEY = "de.inm7.sfb1451.entry.templates"
|
||||||
|
|
||||||
|
|
||||||
|
# The following fields are required in the user input. They can either
|
||||||
# come from the posted data or from the auto_fields-array.
|
# come from the posted data or from the auto_fields-array.
|
||||||
required_fields = [
|
required_fields = [
|
||||||
"form-data-version",
|
"form-data-version",
|
||||||
|
|
@ -99,6 +107,7 @@ required_fields = [
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
# Fields that are required, if subject-group == "patient" is True
|
||||||
required_patient_fields = [
|
required_patient_fields = [
|
||||||
"patient-year-first-symptom",
|
"patient-year-first-symptom",
|
||||||
"patient-month-first-symptom",
|
"patient-month-first-symptom",
|
||||||
|
|
@ -263,7 +272,10 @@ def add_file_to_dataset(dataset_root: Path, file: Path, home: Path):
|
||||||
str(file)
|
str(file)
|
||||||
],
|
],
|
||||||
check=True,
|
check=True,
|
||||||
env={"HOME": str(home)})
|
env={
|
||||||
|
**os.environ,
|
||||||
|
"HOME": str(home)
|
||||||
|
})
|
||||||
|
|
||||||
return subprocess.run(
|
return subprocess.run(
|
||||||
[
|
[
|
||||||
|
|
@ -325,7 +337,10 @@ def date_message(year, month, day):
|
||||||
])
|
])
|
||||||
|
|
||||||
|
|
||||||
def create_result_page(commit_hash: str, time_stamp: float, json_top_data: dict, templates_directory: Path):
|
def create_result_page(commit_hash: str,
|
||||||
|
time_stamp: float,
|
||||||
|
json_top_data: dict,
|
||||||
|
templates_directory: Path):
|
||||||
|
|
||||||
jinja_template_path = templates_directory / "success.html.jinja2"
|
jinja_template_path = templates_directory / "success.html.jinja2"
|
||||||
jinja_template = Environment(autoescape=select_autoescape()).from_string(jinja_template_path.read_text())
|
jinja_template = Environment(autoescape=select_autoescape()).from_string(jinja_template_path.read_text())
|
||||||
|
|
@ -345,7 +360,7 @@ def get_string_content(_: str, field_content: List[str]) -> str:
|
||||||
return field_content[0]
|
return field_content[0]
|
||||||
|
|
||||||
|
|
||||||
def get_checkbox_content(field_name: str, field_content: List[str]) -> str:
|
def get_checkbox_content(_: str, field_content: List[str]) -> str:
|
||||||
return {
|
return {
|
||||||
"": "",
|
"": "",
|
||||||
"off": "False",
|
"off": "False",
|
||||||
|
|
@ -597,132 +612,169 @@ def get_canonic_content_string(field_set: Dict[str, List[str]]) -> str:
|
||||||
return ";".join(field_strings)
|
return ";".join(field_strings)
|
||||||
|
|
||||||
|
|
||||||
|
def encode_result_strings(result_strings: List[str]) -> List[bytes]:
|
||||||
|
return [element.encode("utf-8") for element in result_strings]
|
||||||
|
|
||||||
|
|
||||||
|
def add_auto_fields(existing_fields: dict):
|
||||||
|
"""Add auto fields to existing_fields, if they are not already present"""
|
||||||
|
for key, value in auto_fields.items():
|
||||||
|
if key not in existing_fields:
|
||||||
|
existing_fields[key] = value
|
||||||
|
|
||||||
|
|
||||||
|
def read_mandatory_fields(mandatory_fields: List[str],
|
||||||
|
input_data: Dict
|
||||||
|
) -> Tuple[Dict[str, str], List[str]]:
|
||||||
|
|
||||||
|
resulting_data = dict()
|
||||||
|
missing_keys = []
|
||||||
|
for key in mandatory_fields:
|
||||||
|
if key not in input_data:
|
||||||
|
missing_keys.append(key)
|
||||||
|
else:
|
||||||
|
resulting_data[key] = get_field_value(input_data, key)
|
||||||
|
return resulting_data, missing_keys
|
||||||
|
|
||||||
|
|
||||||
|
def create_bad_request_result(lines: List[str]):
|
||||||
|
return (
|
||||||
|
"400 BAD REQUEST",
|
||||||
|
"text/plain; charset=utf-8",
|
||||||
|
encode_result_strings(lines))
|
||||||
|
|
||||||
|
|
||||||
|
def create_missing_key_result(missing_keys: List[str]):
|
||||||
|
return create_bad_request_result([
|
||||||
|
"The following keys are missing from the request:\n",
|
||||||
|
"\n".join(missing_keys),
|
||||||
|
"\n"])
|
||||||
|
|
||||||
|
|
||||||
def application(environ, start_response):
|
def application(environ, start_response):
|
||||||
|
|
||||||
dataset_root = Path(environ["de.inm7.sfb1451.entry.dataset_root"])
|
try:
|
||||||
home = Path(environ["de.inm7.sfb1451.entry.home"])
|
request_body_size = int(environ.get("CONTENT_LENGTH", 0))
|
||||||
template_directory = Path(environ["de.inm7.sfb1451.entry.templates"])
|
request_body = environ["wsgi.input"].read(request_body_size).decode("utf-8")
|
||||||
|
except (ValueError, KeyError):
|
||||||
|
request_body_size = 0
|
||||||
|
request_body = ""
|
||||||
|
|
||||||
|
try:
|
||||||
|
status, content_type, content = protected_application(environ, request_body)
|
||||||
|
except:
|
||||||
|
status = "500 INTERNAL ERROR"
|
||||||
|
content_type = "text/plain; charset=utf-8"
|
||||||
|
content_strings = [
|
||||||
|
"An unexpected error occured during processing. If this error\n",
|
||||||
|
"persists, please send an email with the following information\n",
|
||||||
|
"to <c.moench@fz-juelich.de> or <m.szczepanik@fz-juelich.de>:\n",
|
||||||
|
"\n",
|
||||||
|
"--------\n",
|
||||||
|
"1. Stacktrace:\n",
|
||||||
|
"".join(format_exception(*sys.exc_info())),
|
||||||
|
"2. Environment:\n",
|
||||||
|
str(environ),
|
||||||
|
"\n",
|
||||||
|
f"3. WSGI input data ({request_body_size}):\n",
|
||||||
|
request_body,
|
||||||
|
"\n",
|
||||||
|
"--------\n"
|
||||||
|
]
|
||||||
|
content = encode_result_strings(content_strings)
|
||||||
|
|
||||||
|
content_length = sum([len(line) for line in content])
|
||||||
|
response_headers = [
|
||||||
|
('Content-type', content_type),
|
||||||
|
('Content-Length', str(content_length))]
|
||||||
|
|
||||||
|
start_response(status, response_headers)
|
||||||
|
return content
|
||||||
|
|
||||||
|
|
||||||
|
def protected_application(environ, request_body):
|
||||||
|
|
||||||
request_method = environ["REQUEST_METHOD"]
|
request_method = environ["REQUEST_METHOD"]
|
||||||
if request_method == "POST":
|
if request_method != "POST":
|
||||||
try:
|
return create_bad_request_result(["Only POST is supported\n"])
|
||||||
request_body_size = int(environ.get("CONTENT_LENGTH", 0))
|
|
||||||
except ValueError:
|
|
||||||
request_body_size = 0
|
|
||||||
|
|
||||||
environment = [f"{key}: {value}" for key, value in environ.items()]
|
dataset_root = Path(environ[DATASET_ROOT_KEY])
|
||||||
|
home = Path(environ[HOME_KEY])
|
||||||
|
template_directory = Path(environ[TEMPLATE_DIRECTORY_KEY])
|
||||||
|
|
||||||
request_body = environ["wsgi.input"].read(request_body_size).decode("utf-8")
|
# Parse data and check value structure
|
||||||
entered_data = parse_qs(request_body)
|
sent_data = parse_qs(request_body)
|
||||||
|
for value in sent_data.values():
|
||||||
|
if not isinstance(value, list) or not len(value) == 1:
|
||||||
|
raise ValueError(f"expected list of length one, got: {repr(value)}")
|
||||||
|
|
||||||
posted_data_string = "\n".join(
|
# Add auto fields to the sent data
|
||||||
[f"{key}: {value}" for key, value in entered_data.items()])
|
add_auto_fields(sent_data)
|
||||||
|
|
||||||
# Check single results
|
# Correct the optional checkbox field in the sent data
|
||||||
for value in entered_data.values():
|
correct_optional_checkbox_fields(sent_data)
|
||||||
assert isinstance(value, list)
|
|
||||||
assert len(value) == 1
|
|
||||||
|
|
||||||
# Add auto fields to the entered data, if they are not already present
|
# Read the mandatory keys into the result dictionary
|
||||||
for key, value in auto_fields.items():
|
entered_data_object, missing_keys = read_mandatory_fields(
|
||||||
if key not in entered_data:
|
required_fields,
|
||||||
entered_data[key] = value
|
sent_data)
|
||||||
|
|
||||||
# Correct the optional checkbox fields
|
if missing_keys:
|
||||||
correct_optional_checkbox_fields(entered_data)
|
return create_missing_key_result(missing_keys)
|
||||||
|
|
||||||
# Check the hash value
|
if entered_data_object["subject-group"] == "patient":
|
||||||
local_hash_string = get_canonic_content_string(entered_data)
|
entered_patient_data, missing_patient_keys = read_mandatory_fields(
|
||||||
if local_hash_string != entered_data["hashed-string"][0]:
|
required_patient_fields,
|
||||||
status = "400 BAD REQUEST"
|
sent_data)
|
||||||
output = [
|
|
||||||
"Local hash input-string does not match submitted values\n".encode("utf-8"),
|
|
||||||
("LOCAL: " + local_hash_string + "\n").encode(),
|
|
||||||
("SENT: " + entered_data["hashed-string"][0] + "\n").encode()]
|
|
||||||
output_length = sum([len(line) for line in output])
|
|
||||||
response_headers = [('Content-type', 'text/plain; charset=utf-8'),
|
|
||||||
('Content-Length', str(output_length))]
|
|
||||||
start_response(status, response_headers)
|
|
||||||
return output
|
|
||||||
|
|
||||||
local_hash_value = hashlib.sha256(local_hash_string.encode()).hexdigest()
|
entered_data_object.update(entered_patient_data)
|
||||||
if local_hash_value != entered_data["hash-value"][0]:
|
missing_keys.extend(missing_patient_keys)
|
||||||
status = "400 BAD REQUEST"
|
|
||||||
output = ["Server side hash value does not match submitted hash value".encode("utf-8")]
|
|
||||||
output_length = sum([len(line) for line in output])
|
|
||||||
response_headers = [('Content-type', 'text/plain; charset=utf-8'),
|
|
||||||
('Content-Length', str(output_length))]
|
|
||||||
start_response(status, response_headers)
|
|
||||||
return output
|
|
||||||
|
|
||||||
# Create posted data dictionary
|
if missing_keys:
|
||||||
json_object = dict()
|
return create_missing_key_result(missing_keys)
|
||||||
|
|
||||||
# Read the mandatory keys
|
# Check the hash value
|
||||||
for key in required_fields:
|
local_hash_string = get_canonic_content_string(sent_data)
|
||||||
# This will throw an error, if the key is not available
|
if local_hash_string != sent_data["hashed-string"][0]:
|
||||||
json_object[key] = get_field_value(entered_data, key)
|
return create_bad_request_result([
|
||||||
|
"Local hash input-string does not match submitted values\n",
|
||||||
|
"LOCAL: " + local_hash_string + "\n",
|
||||||
|
"SENT: " + sent_data["hashed-string"][0] + "\n"])
|
||||||
|
|
||||||
# Read keys dependent on subject-group
|
local_hash_value = hashlib.sha256(local_hash_string.encode()).hexdigest()
|
||||||
if json_object["subject-group"] == "patient":
|
if local_hash_value != sent_data["hash-value"][0]:
|
||||||
for key in required_patient_fields:
|
return create_bad_request_result([
|
||||||
# This will throw an error, if the key is not available
|
"Server side hash value does not match submitted hash value\n"])
|
||||||
json_object[key] = get_field_value(entered_data, key)
|
|
||||||
|
|
||||||
time_stamp = time.time()
|
time_stamp = time.time()
|
||||||
|
|
||||||
json_data = {
|
result_object = {
|
||||||
"source": {
|
"source": {
|
||||||
"time_stamp": time_stamp,
|
"time_stamp": time_stamp,
|
||||||
"version": entered_data["form-data-version"][0],
|
"version": sent_data["form-data-version"][0],
|
||||||
"remote_address": environ["REMOTE_ADDR"],
|
"remote_address": environ["REMOTE_ADDR"],
|
||||||
"hashed-string": entered_data["hashed-string"][0],
|
"hashed-string": sent_data["hashed-string"][0],
|
||||||
"hash-value": entered_data["hash-value"][0],
|
"hash-value": sent_data["hash-value"][0],
|
||||||
"signature-data": (
|
"signature-data": (
|
||||||
None
|
None
|
||||||
if entered_data["signature-data"][0] == ""
|
if sent_data["signature-data"][0] == ""
|
||||||
else entered_data["signature-data"][0]
|
else sent_data["signature-data"][0]
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
"data": json_object
|
"data": entered_data_object
|
||||||
}
|
}
|
||||||
|
|
||||||
directory = dataset_root / "input" / json_data["source"]["version"]
|
directory = dataset_root / "input" / result_object["source"]["version"]
|
||||||
directory.mkdir(parents=True, exist_ok=True)
|
directory.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
output_file = directory / (str(time_stamp) + ".json")
|
output_file = directory / (str(time_stamp) + ".json")
|
||||||
with output_file.open("x") as f:
|
with output_file.open("x") as f:
|
||||||
json.dump(json_data, f)
|
json.dump(result_object, f)
|
||||||
|
|
||||||
commit_hash = add_file_to_dataset(dataset_root, directory / output_file, home)
|
commit_hash = add_file_to_dataset(dataset_root, directory / output_file, home)
|
||||||
|
|
||||||
result_message = create_result_page(commit_hash, time_stamp, json_data, template_directory)
|
result_message = create_result_page(commit_hash, time_stamp, result_object, template_directory)
|
||||||
|
return (
|
||||||
status = "200 OK"
|
"200 OK",
|
||||||
|
"text/html; charset=utf-8",
|
||||||
output = [
|
encode_result_strings([result_message]))
|
||||||
result_message.encode(),
|
|
||||||
#"2-------------\n".encode(),
|
|
||||||
#("\n".join(environment) + "\n").encode("utf-8"),
|
|
||||||
#"3-------------\n".encode(),
|
|
||||||
#(posted_data_string + "\n").encode("utf-8"),
|
|
||||||
#"4-------------\n".encode(),
|
|
||||||
#json.dumps(json_data, indent=4).encode(),
|
|
||||||
#"5-------------\n".encode(),
|
|
||||||
#(local_hash_string + "\n").encode(),
|
|
||||||
#(entered_data["hashed-string"][0] + "\n").encode(),
|
|
||||||
#(hashlib.sha256(local_hash_string.encode()).hexdigest() + "\n").encode(),
|
|
||||||
#f"==: {local_hash_string == entered_data['hashed-string'][0]}\n".encode()
|
|
||||||
]
|
|
||||||
|
|
||||||
else:
|
|
||||||
|
|
||||||
status = "400 BAD REQUEST"
|
|
||||||
output = ["Only post method allowed".encode("utf-8")]
|
|
||||||
|
|
||||||
output_length = sum([len(line) for line in output])
|
|
||||||
response_headers = [('Content-type', 'text/html; charset=utf-8'),
|
|
||||||
('Content-Length', str(output_length))]
|
|
||||||
start_response(status, response_headers)
|
|
||||||
|
|
||||||
return output
|
|
||||||
|
|
|
||||||
118
server/tests/test_store_data.py
Normal file
118
server/tests/test_store_data.py
Normal file
|
|
@ -0,0 +1,118 @@
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
from webtest import TestApp
|
||||||
|
from webtest.app import AppError
|
||||||
|
|
||||||
|
|
||||||
|
server_dir = Path(__file__).parents[1]
|
||||||
|
template_dir = Path(__file__).parents[2] / "templates"
|
||||||
|
|
||||||
|
sys.path.insert(0, str(server_dir))
|
||||||
|
|
||||||
|
import store_data
|
||||||
|
from store_data import DATASET_ROOT_KEY, HOME_KEY, TEMPLATE_DIRECTORY_KEY
|
||||||
|
|
||||||
|
|
||||||
|
minimal_form_data = """form-data-version=2.2&data-entry-domain=de.sfb1451.z03&data-entry-employee=cm-test&project-code=b4&subject-pseudonym=test-111&date-of-birth=2000-01-01&sex=male&date-of-test=2020-01-01&subject-group=healthy&patient-year-first-symptom=&patient-month-first-symptom=&patient-day-first-symptom=&patient-year-diagnosis=&patient-month-diagnosis=&patient-day-diagnosis=&additional-remarks=&hashed-string=form-data-version%3A2.2%3Bdata-entry-domain%3Ade.sfb1451.z03%3Bdata-entry-employee%3Acm-test%3Bproject-code%3Ab4%3Bsubject-pseudonym%3Atest-111%3Bdate-of-birth%3A2000-01-01%3Bsex%3Amale%3Bdate-of-test%3A2020-01-01%3Brepeated-test%3AFalse%3Bpatient-year-first-symptom%3A%3Bpatient-month-first-symptom%3A%3Bpatient-day-first-symptom%3A%3Bpatient-year-diagnosis%3A%3Bpatient-month-diagnosis%3A%3Bpatient-day-diagnosis%3A%3Bpatient-main-disease%3A%3Bpatient-stronger-impacted-hand%3A%3Blaterality-quotient%3A%3Bmaximum-ftf-left%3A%3Bmaximum-ftf-right%3A%3Bmaximum-gs-left%3A%3Bmaximum-gs-right%3A%3Bpurdue-pegboard-left%3A%3Bpurdue-pegboard-right%3A%3Bturn-cards-left%3A%3Bturn-cards-right%3A%3Bsmall-things-left%3A%3Bsmall-things-right%3A%3Bsimulated-feeding-left%3A%3Bsimulated-feeding-right%3A%3Bcheckers-left%3A%3Bcheckers-right%3A%3Blarge-light-things-left%3A%3Blarge-light-things-right%3A%3Blarge-heavy-things-left%3A%3Blarge-heavy-things-right%3A%3Bjtt-incorrectly-executed%3A%3Barat-left%3A%3Barat-right%3A%3Btug-executed%3A%3Btug-a-incorrectly-executed%3A%3Btug-a-tools-required%3A%3Btug-imagined%3A%3Bgo-nogo-block-count%3A%3Bgo-nogo-total-errors%3A%3Bgo-nogo-wrong-errors%3A%3Bgo-nogo-recognized-errors%3A%3Bgo-nogo-correct-answer-time%3A%3Bgo-nogo-recognized-error-time%3A%3Bgo-nogo-incorrectly-executed%3A%3Bkas-pantomime-bukko-facial%3A%3Bkas-pantomime-arm-hand%3A%3Bkas-imitation-bukko-facial%3A%3Bkas-imitation-arm-hand%3A%3Bkopss-orientation%3A%3Bkopss-speech%3A%3Bkopss-praxie%3A%3Bkopss-visual-spatial-performance%3A%3Bkopss-calculating%3A%3Bkopss-executive-performance%3A%3Bkopss-memory%3A%3Bkopss-affect%3A%3Bkopss-behavior-observation%3A%3Bacl-k-loud-reading%3A%3Bacl-k-color-form-test%3A%3Bacl-k-supermarket-task%3A%3Bacl-k-communication-ability%3A%3Bbdi-ii-score%3A%3Bmadrs-score%3A%3Bdemtect-wordlist%3A%3Bdemtect-convert-numbers%3A%3Bdemtect-supermarket-task%3A%3Bdemtect-numbers-reverse%3A%3Bdemtect-wordlist-recall%3A%3Btime-tmt-a%3A%3Btmt-a-incorrectly-executed%3A%3Btime-tmt-b%3A%3Btmt-b-incorrectly-executed%3A%3Bmrs-score%3A%3Beuroqol-code%3A%3Beuroqol-vas%3A%3Bisced-value%3A%3Badditional-mrt-url%3A%3Badditional-mrt-resting-state%3A%3Badditional-mrt-tapping-task%3A%3Badditional-mrt-anatomical-representation%3A%3Badditional-mrt-dti%3A%3Badditional-eeg-url%3A%3Badditional-blood-sampling-url%3A%3Badditional-remarks%3A&hash-value=3bb998f5acf11ad82a17b3cef2c14258712a3b50c5efe7261e7792158e058ebe&signature-data="""
|
||||||
|
|
||||||
|
|
||||||
|
class TestFileTree(unittest.TestCase):
|
||||||
|
|
||||||
|
def _test_exception_caught(self,
|
||||||
|
app_tester,
|
||||||
|
params,
|
||||||
|
extra_environ,
|
||||||
|
patterns: List[str]):
|
||||||
|
|
||||||
|
with self.assertRaises(AppError) as context_manager:
|
||||||
|
app_tester.post(
|
||||||
|
url="/store-data",
|
||||||
|
params=params,
|
||||||
|
extra_environ=extra_environ)
|
||||||
|
error_message = context_manager.exception.args[0]
|
||||||
|
print(error_message)
|
||||||
|
for pattern in patterns:
|
||||||
|
self.assertTrue(error_message.find(pattern) > 0)
|
||||||
|
|
||||||
|
def test_missing_environ_catching(self):
|
||||||
|
app_tester = TestApp(store_data.application)
|
||||||
|
self._test_exception_caught(
|
||||||
|
app_tester=app_tester,
|
||||||
|
params="Hello",
|
||||||
|
extra_environ={},
|
||||||
|
patterns=[
|
||||||
|
"<c.moench@fz-juelich.de>",
|
||||||
|
f"KeyError: '{DATASET_ROOT_KEY}'"])
|
||||||
|
|
||||||
|
def test_missing_data_catching(self):
|
||||||
|
app_tester = TestApp(store_data.application)
|
||||||
|
self._test_exception_caught(
|
||||||
|
app_tester=app_tester,
|
||||||
|
params="Hello",
|
||||||
|
extra_environ={
|
||||||
|
DATASET_ROOT_KEY: "",
|
||||||
|
HOME_KEY: "",
|
||||||
|
TEMPLATE_DIRECTORY_KEY: ""
|
||||||
|
},
|
||||||
|
patterns=[
|
||||||
|
"keys are missing",
|
||||||
|
"project-code"])
|
||||||
|
|
||||||
|
def test_data_storage(self):
|
||||||
|
app_tester = TestApp(store_data.application)
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
|
||||||
|
with \
|
||||||
|
patch("store_data.add_file_to_dataset") as add_file_mock, \
|
||||||
|
patch("time.time") as time_mock:
|
||||||
|
|
||||||
|
add_file_mock.return_value = 0
|
||||||
|
time_mock.return_value = 0.0
|
||||||
|
|
||||||
|
app_tester.post(
|
||||||
|
url="/store-data",
|
||||||
|
params=minimal_form_data,
|
||||||
|
extra_environ={
|
||||||
|
DATASET_ROOT_KEY: temp_dir,
|
||||||
|
HOME_KEY: os.environ["HOME"],
|
||||||
|
TEMPLATE_DIRECTORY_KEY: str(template_dir),
|
||||||
|
"REMOTE_ADDR": "1.2.3.4"
|
||||||
|
})
|
||||||
|
|
||||||
|
expected_path = Path(temp_dir) / "input/2.2/0.0.json"
|
||||||
|
with expected_path.open() as f:
|
||||||
|
json_object = json.load(f)
|
||||||
|
print(json_object)
|
||||||
|
|
||||||
|
def test_datalad_saving(self):
|
||||||
|
app_tester = TestApp(store_data.application)
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
|
||||||
|
dataset_path = Path(temp_dir) / "dataset"
|
||||||
|
subprocess.run(["datalad", "create", "-c", "text2git", str(dataset_path)])
|
||||||
|
subprocess.run(["datalad", "no-annex", "-d", str(dataset_path)])
|
||||||
|
|
||||||
|
with patch("time.time") as time_mock:
|
||||||
|
time_mock.return_value = 0.0
|
||||||
|
app_tester.post(
|
||||||
|
url="/store-data",
|
||||||
|
params=minimal_form_data,
|
||||||
|
extra_environ={
|
||||||
|
DATASET_ROOT_KEY: str(dataset_path),
|
||||||
|
HOME_KEY: os.environ["HOME"],
|
||||||
|
TEMPLATE_DIRECTORY_KEY: str(template_dir),
|
||||||
|
"REMOTE_ADDR": "1.2.3.4"
|
||||||
|
})
|
||||||
|
|
||||||
|
expected_path = dataset_path / "input/2.2/0.0.json"
|
||||||
|
with expected_path.open() as f:
|
||||||
|
json_object = json.load(f)
|
||||||
|
print(json_object)
|
||||||
Loading…
Add table
Add a link
Reference in a new issue