Add tests #22

Merged
christian-monch merged 14 commits from nf-add-tests into master 2022-02-28 18:47:12 +00:00
4 changed files with 340 additions and 125 deletions

40
.github/workflows/run-server-tests.yml vendored Normal file
View file

@ -0,0 +1,40 @@
name: Server tests
on: [push]
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.7]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install datalad-installer
datalad-installer datalad git-annex
export PATH="$PATH:/usr/share/miniconda/bin"
echo which git-annex
which git-annex
echo which datalad
which datalad
if [ -f requirements-devel.txt ]; then pip install -r requirements-devel.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --ignore E501,E722,E402 --show-source --statistics
- name: Test with pytest
run: |
export PATH="$PATH:/usr/share/miniconda/bin"
git config --global user.email "github-test@example.com"
git config --global user.name "Github Testscript"
pytest

5
requirements-devel.txt Normal file
View file

@ -0,0 +1,5 @@
webtest
pytest
flake8
jinja2

View file

@ -1,15 +1,23 @@
import hashlib import hashlib
import json import json
import os
import sys
import time import time
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import Dict, List, Union from traceback import format_exception
from typing import Dict, List, Tuple, Union
from urllib.parse import parse_qs from urllib.parse import parse_qs
from jinja2 import Environment, PackageLoader, select_autoescape from jinja2 import Environment, select_autoescape
# Those fields are required in the user input. They can either DATASET_ROOT_KEY = "de.inm7.sfb1451.entry.dataset_root"
HOME_KEY = "de.inm7.sfb1451.entry.home"
TEMPLATE_DIRECTORY_KEY = "de.inm7.sfb1451.entry.templates"
# The following fields are required in the user input. They can either
# come from the posted data or from the auto_fields-array. # come from the posted data or from the auto_fields-array.
required_fields = [ required_fields = [
"form-data-version", "form-data-version",
@ -99,6 +107,7 @@ required_fields = [
] ]
# Fields that are required, if subject-group == "patient" is True
required_patient_fields = [ required_patient_fields = [
"patient-year-first-symptom", "patient-year-first-symptom",
"patient-month-first-symptom", "patient-month-first-symptom",
@ -263,7 +272,10 @@ def add_file_to_dataset(dataset_root: Path, file: Path, home: Path):
str(file) str(file)
], ],
check=True, check=True,
env={"HOME": str(home)}) env={
**os.environ,
"HOME": str(home)
})
return subprocess.run( return subprocess.run(
[ [
@ -325,7 +337,10 @@ def date_message(year, month, day):
]) ])
def create_result_page(commit_hash: str, time_stamp: float, json_top_data: dict, templates_directory: Path): def create_result_page(commit_hash: str,
time_stamp: float,
json_top_data: dict,
templates_directory: Path):
jinja_template_path = templates_directory / "success.html.jinja2" jinja_template_path = templates_directory / "success.html.jinja2"
jinja_template = Environment(autoescape=select_autoescape()).from_string(jinja_template_path.read_text()) jinja_template = Environment(autoescape=select_autoescape()).from_string(jinja_template_path.read_text())
@ -345,7 +360,7 @@ def get_string_content(_: str, field_content: List[str]) -> str:
return field_content[0] return field_content[0]
def get_checkbox_content(field_name: str, field_content: List[str]) -> str: def get_checkbox_content(_: str, field_content: List[str]) -> str:
return { return {
"": "", "": "",
"off": "False", "off": "False",
@ -597,132 +612,169 @@ def get_canonic_content_string(field_set: Dict[str, List[str]]) -> str:
return ";".join(field_strings) return ";".join(field_strings)
def encode_result_strings(result_strings: List[str]) -> List[bytes]:
return [element.encode("utf-8") for element in result_strings]
def add_auto_fields(existing_fields: dict):
"""Add auto fields to existing_fields, if they are not already present"""
for key, value in auto_fields.items():
if key not in existing_fields:
existing_fields[key] = value
def read_mandatory_fields(mandatory_fields: List[str],
input_data: Dict
) -> Tuple[Dict[str, str], List[str]]:
resulting_data = dict()
missing_keys = []
for key in mandatory_fields:
if key not in input_data:
missing_keys.append(key)
else:
resulting_data[key] = get_field_value(input_data, key)
return resulting_data, missing_keys
def create_bad_request_result(lines: List[str]):
return (
"400 BAD REQUEST",
"text/plain; charset=utf-8",
encode_result_strings(lines))
def create_missing_key_result(missing_keys: List[str]):
return create_bad_request_result([
"The following keys are missing from the request:\n",
"\n".join(missing_keys),
"\n"])
def application(environ, start_response): def application(environ, start_response):
dataset_root = Path(environ["de.inm7.sfb1451.entry.dataset_root"])
home = Path(environ["de.inm7.sfb1451.entry.home"])
template_directory = Path(environ["de.inm7.sfb1451.entry.templates"])
request_method = environ["REQUEST_METHOD"]
if request_method == "POST":
try: try:
request_body_size = int(environ.get("CONTENT_LENGTH", 0)) request_body_size = int(environ.get("CONTENT_LENGTH", 0))
except ValueError:
request_body_size = 0
environment = [f"{key}: {value}" for key, value in environ.items()]
request_body = environ["wsgi.input"].read(request_body_size).decode("utf-8") request_body = environ["wsgi.input"].read(request_body_size).decode("utf-8")
entered_data = parse_qs(request_body) except (ValueError, KeyError):
request_body_size = 0
request_body = ""
posted_data_string = "\n".join( try:
[f"{key}: {value}" for key, value in entered_data.items()]) status, content_type, content = protected_application(environ, request_body)
except:
status = "500 INTERNAL ERROR"
content_type = "text/plain; charset=utf-8"
content_strings = [
"An unexpected error occured during processing. If this error\n",
"persists, please send an email with the following information\n",
"to <c.moench@fz-juelich.de> or <m.szczepanik@fz-juelich.de>:\n",
"\n",
"--------\n",
"1. Stacktrace:\n",
"".join(format_exception(*sys.exc_info())),
"2. Environment:\n",
str(environ),
"\n",
f"3. WSGI input data ({request_body_size}):\n",
request_body,
"\n",
"--------\n"
]
content = encode_result_strings(content_strings)
# Check single results content_length = sum([len(line) for line in content])
for value in entered_data.values(): response_headers = [
assert isinstance(value, list) ('Content-type', content_type),
assert len(value) == 1 ('Content-Length', str(content_length))]
# Add auto fields to the entered data, if they are not already present start_response(status, response_headers)
for key, value in auto_fields.items(): return content
if key not in entered_data:
entered_data[key] = value
# Correct the optional checkbox fields
correct_optional_checkbox_fields(entered_data) def protected_application(environ, request_body):
request_method = environ["REQUEST_METHOD"]
if request_method != "POST":
return create_bad_request_result(["Only POST is supported\n"])
dataset_root = Path(environ[DATASET_ROOT_KEY])
home = Path(environ[HOME_KEY])
template_directory = Path(environ[TEMPLATE_DIRECTORY_KEY])
# Parse data and check value structure
sent_data = parse_qs(request_body)
for value in sent_data.values():
if not isinstance(value, list) or not len(value) == 1:
raise ValueError(f"expected list of length one, got: {repr(value)}")
# Add auto fields to the sent data
add_auto_fields(sent_data)
# Correct the optional checkbox field in the sent data
correct_optional_checkbox_fields(sent_data)
# Read the mandatory keys into the result dictionary
entered_data_object, missing_keys = read_mandatory_fields(
required_fields,
sent_data)
if missing_keys:
return create_missing_key_result(missing_keys)
if entered_data_object["subject-group"] == "patient":
entered_patient_data, missing_patient_keys = read_mandatory_fields(
required_patient_fields,
sent_data)
entered_data_object.update(entered_patient_data)
missing_keys.extend(missing_patient_keys)
if missing_keys:
return create_missing_key_result(missing_keys)
# Check the hash value # Check the hash value
local_hash_string = get_canonic_content_string(entered_data) local_hash_string = get_canonic_content_string(sent_data)
if local_hash_string != entered_data["hashed-string"][0]: if local_hash_string != sent_data["hashed-string"][0]:
status = "400 BAD REQUEST" return create_bad_request_result([
output = [ "Local hash input-string does not match submitted values\n",
"Local hash input-string does not match submitted values\n".encode("utf-8"), "LOCAL: " + local_hash_string + "\n",
("LOCAL: " + local_hash_string + "\n").encode(), "SENT: " + sent_data["hashed-string"][0] + "\n"])
("SENT: " + entered_data["hashed-string"][0] + "\n").encode()]
output_length = sum([len(line) for line in output])
response_headers = [('Content-type', 'text/plain; charset=utf-8'),
('Content-Length', str(output_length))]
start_response(status, response_headers)
return output
local_hash_value = hashlib.sha256(local_hash_string.encode()).hexdigest() local_hash_value = hashlib.sha256(local_hash_string.encode()).hexdigest()
if local_hash_value != entered_data["hash-value"][0]: if local_hash_value != sent_data["hash-value"][0]:
status = "400 BAD REQUEST" return create_bad_request_result([
output = ["Server side hash value does not match submitted hash value".encode("utf-8")] "Server side hash value does not match submitted hash value\n"])
output_length = sum([len(line) for line in output])
response_headers = [('Content-type', 'text/plain; charset=utf-8'),
('Content-Length', str(output_length))]
start_response(status, response_headers)
return output
# Create posted data dictionary
json_object = dict()
# Read the mandatory keys
for key in required_fields:
# This will throw an error, if the key is not available
json_object[key] = get_field_value(entered_data, key)
# Read keys dependent on subject-group
if json_object["subject-group"] == "patient":
for key in required_patient_fields:
# This will throw an error, if the key is not available
json_object[key] = get_field_value(entered_data, key)
time_stamp = time.time() time_stamp = time.time()
json_data = { result_object = {
"source": { "source": {
"time_stamp": time_stamp, "time_stamp": time_stamp,
"version": entered_data["form-data-version"][0], "version": sent_data["form-data-version"][0],
"remote_address": environ["REMOTE_ADDR"], "remote_address": environ["REMOTE_ADDR"],
"hashed-string": entered_data["hashed-string"][0], "hashed-string": sent_data["hashed-string"][0],
"hash-value": entered_data["hash-value"][0], "hash-value": sent_data["hash-value"][0],
"signature-data": ( "signature-data": (
None None
if entered_data["signature-data"][0] == "" if sent_data["signature-data"][0] == ""
else entered_data["signature-data"][0] else sent_data["signature-data"][0]
) )
}, },
"data": json_object "data": entered_data_object
} }
directory = dataset_root / "input" / json_data["source"]["version"] directory = dataset_root / "input" / result_object["source"]["version"]
directory.mkdir(parents=True, exist_ok=True) directory.mkdir(parents=True, exist_ok=True)
output_file = directory / (str(time_stamp) + ".json") output_file = directory / (str(time_stamp) + ".json")
with output_file.open("x") as f: with output_file.open("x") as f:
json.dump(json_data, f) json.dump(result_object, f)
commit_hash = add_file_to_dataset(dataset_root, directory / output_file, home) commit_hash = add_file_to_dataset(dataset_root, directory / output_file, home)
result_message = create_result_page(commit_hash, time_stamp, json_data, template_directory) result_message = create_result_page(commit_hash, time_stamp, result_object, template_directory)
return (
status = "200 OK" "200 OK",
"text/html; charset=utf-8",
output = [ encode_result_strings([result_message]))
result_message.encode(),
#"2-------------\n".encode(),
#("\n".join(environment) + "\n").encode("utf-8"),
#"3-------------\n".encode(),
#(posted_data_string + "\n").encode("utf-8"),
#"4-------------\n".encode(),
#json.dumps(json_data, indent=4).encode(),
#"5-------------\n".encode(),
#(local_hash_string + "\n").encode(),
#(entered_data["hashed-string"][0] + "\n").encode(),
#(hashlib.sha256(local_hash_string.encode()).hexdigest() + "\n").encode(),
#f"==: {local_hash_string == entered_data['hashed-string'][0]}\n".encode()
]
else:
status = "400 BAD REQUEST"
output = ["Only post method allowed".encode("utf-8")]
output_length = sum([len(line) for line in output])
response_headers = [('Content-type', 'text/html; charset=utf-8'),
('Content-Length', str(output_length))]
start_response(status, response_headers)
return output

View file

@ -0,0 +1,118 @@
import json
import os
import subprocess
import sys
import unittest
import tempfile
from pathlib import Path
from typing import List
from unittest.mock import patch
from webtest import TestApp
from webtest.app import AppError
server_dir = Path(__file__).parents[1]
template_dir = Path(__file__).parents[2] / "templates"
sys.path.insert(0, str(server_dir))
import store_data
from store_data import DATASET_ROOT_KEY, HOME_KEY, TEMPLATE_DIRECTORY_KEY
minimal_form_data = """form-data-version=2.2&data-entry-domain=de.sfb1451.z03&data-entry-employee=cm-test&project-code=b4&subject-pseudonym=test-111&date-of-birth=2000-01-01&sex=male&date-of-test=2020-01-01&subject-group=healthy&patient-year-first-symptom=&patient-month-first-symptom=&patient-day-first-symptom=&patient-year-diagnosis=&patient-month-diagnosis=&patient-day-diagnosis=&additional-remarks=&hashed-string=form-data-version%3A2.2%3Bdata-entry-domain%3Ade.sfb1451.z03%3Bdata-entry-employee%3Acm-test%3Bproject-code%3Ab4%3Bsubject-pseudonym%3Atest-111%3Bdate-of-birth%3A2000-01-01%3Bsex%3Amale%3Bdate-of-test%3A2020-01-01%3Brepeated-test%3AFalse%3Bpatient-year-first-symptom%3A%3Bpatient-month-first-symptom%3A%3Bpatient-day-first-symptom%3A%3Bpatient-year-diagnosis%3A%3Bpatient-month-diagnosis%3A%3Bpatient-day-diagnosis%3A%3Bpatient-main-disease%3A%3Bpatient-stronger-impacted-hand%3A%3Blaterality-quotient%3A%3Bmaximum-ftf-left%3A%3Bmaximum-ftf-right%3A%3Bmaximum-gs-left%3A%3Bmaximum-gs-right%3A%3Bpurdue-pegboard-left%3A%3Bpurdue-pegboard-right%3A%3Bturn-cards-left%3A%3Bturn-cards-right%3A%3Bsmall-things-left%3A%3Bsmall-things-right%3A%3Bsimulated-feeding-left%3A%3Bsimulated-feeding-right%3A%3Bcheckers-left%3A%3Bcheckers-right%3A%3Blarge-light-things-left%3A%3Blarge-light-things-right%3A%3Blarge-heavy-things-left%3A%3Blarge-heavy-things-right%3A%3Bjtt-incorrectly-executed%3A%3Barat-left%3A%3Barat-right%3A%3Btug-executed%3A%3Btug-a-incorrectly-executed%3A%3Btug-a-tools-required%3A%3Btug-imagined%3A%3Bgo-nogo-block-count%3A%3Bgo-nogo-total-errors%3A%3Bgo-nogo-wrong-errors%3A%3Bgo-nogo-recognized-errors%3A%3Bgo-nogo-correct-answer-time%3A%3Bgo-nogo-recognized-error-time%3A%3Bgo-nogo-incorrectly-executed%3A%3Bkas-pantomime-bukko-facial%3A%3Bkas-pantomime-arm-hand%3A%3Bkas-imitation-bukko-facial%3A%3Bkas-imitation-arm-hand%3A%3Bkopss-orientation%3A%3Bkopss-speech%3A%3Bkopss-praxie%3A%3Bkopss-visual-spatial-performance%3A%3Bkopss-calculating%3A%3Bkopss-executive-performance%3A%3Bkopss-memory%3A%3Bkopss-affect%3A%3Bkopss-behavior-observation%3A%3Bacl-k-loud-reading%3A%3Bacl-k-color-form-test%3A%3Bacl-k-supermarket-task%3A%3Bacl-k-communication-ability%3A%3Bbdi-ii-score%3A%3Bmadrs-score%3A%3Bdemtect-wordlist%3A%3Bdemtect-convert-numbers%3A%3Bdemtect-supermarket-task%3A%3Bdemtect-numbers-reverse%3A%3Bdemtect-wordlist-recall%3A%3Btime-tmt-a%3A%3Btmt-a-incorrectly-executed%3A%3Btime-tmt-b%3A%3Btmt-b-incorrectly-executed%3A%3Bmrs-score%3A%3Beuroqol-code%3A%3Beuroqol-vas%3A%3Bisced-value%3A%3Badditional-mrt-url%3A%3Badditional-mrt-resting-state%3A%3Badditional-mrt-tapping-task%3A%3Badditional-mrt-anatomical-representation%3A%3Badditional-mrt-dti%3A%3Badditional-eeg-url%3A%3Badditional-blood-sampling-url%3A%3Badditional-remarks%3A&hash-value=3bb998f5acf11ad82a17b3cef2c14258712a3b50c5efe7261e7792158e058ebe&signature-data="""
class TestFileTree(unittest.TestCase):
def _test_exception_caught(self,
app_tester,
params,
extra_environ,
patterns: List[str]):
with self.assertRaises(AppError) as context_manager:
app_tester.post(
url="/store-data",
params=params,
extra_environ=extra_environ)
error_message = context_manager.exception.args[0]
print(error_message)
for pattern in patterns:
self.assertTrue(error_message.find(pattern) > 0)
def test_missing_environ_catching(self):
app_tester = TestApp(store_data.application)
self._test_exception_caught(
app_tester=app_tester,
params="Hello",
extra_environ={},
patterns=[
"<c.moench@fz-juelich.de>",
f"KeyError: '{DATASET_ROOT_KEY}'"])
def test_missing_data_catching(self):
app_tester = TestApp(store_data.application)
self._test_exception_caught(
app_tester=app_tester,
params="Hello",
extra_environ={
DATASET_ROOT_KEY: "",
HOME_KEY: "",
TEMPLATE_DIRECTORY_KEY: ""
},
patterns=[
"keys are missing",
"project-code"])
def test_data_storage(self):
app_tester = TestApp(store_data.application)
with tempfile.TemporaryDirectory() as temp_dir:
with \
patch("store_data.add_file_to_dataset") as add_file_mock, \
patch("time.time") as time_mock:
add_file_mock.return_value = 0
time_mock.return_value = 0.0
app_tester.post(
url="/store-data",
params=minimal_form_data,
extra_environ={
DATASET_ROOT_KEY: temp_dir,
HOME_KEY: os.environ["HOME"],
TEMPLATE_DIRECTORY_KEY: str(template_dir),
"REMOTE_ADDR": "1.2.3.4"
})
expected_path = Path(temp_dir) / "input/2.2/0.0.json"
with expected_path.open() as f:
json_object = json.load(f)
print(json_object)
def test_datalad_saving(self):
app_tester = TestApp(store_data.application)
with tempfile.TemporaryDirectory() as temp_dir:
dataset_path = Path(temp_dir) / "dataset"
subprocess.run(["datalad", "create", "-c", "text2git", str(dataset_path)])
subprocess.run(["datalad", "no-annex", "-d", str(dataset_path)])
with patch("time.time") as time_mock:
time_mock.return_value = 0.0
app_tester.post(
url="/store-data",
params=minimal_form_data,
extra_environ={
DATASET_ROOT_KEY: str(dataset_path),
HOME_KEY: os.environ["HOME"],
TEMPLATE_DIRECTORY_KEY: str(template_dir),
"REMOTE_ADDR": "1.2.3.4"
})
expected_path = dataset_path / "input/2.2/0.0.json"
with expected_path.open() as f:
json_object = json.load(f)
print(json_object)