Add tests #22

Merged
christian-monch merged 14 commits from nf-add-tests into master 2022-02-28 18:47:12 +00:00
4 changed files with 340 additions and 125 deletions

40
.github/workflows/run-server-tests.yml vendored Normal file
View file

@ -0,0 +1,40 @@
name: Server tests
on: [push]
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.7]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install datalad-installer
datalad-installer datalad git-annex
export PATH="$PATH:/usr/share/miniconda/bin"
echo which git-annex
which git-annex
echo which datalad
which datalad
if [ -f requirements-devel.txt ]; then pip install -r requirements-devel.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --ignore E501,E722,E402 --show-source --statistics
- name: Test with pytest
run: |
export PATH="$PATH:/usr/share/miniconda/bin"
git config --global user.email "github-test@example.com"
git config --global user.name "Github Testscript"
pytest

5
requirements-devel.txt Normal file
View file

@ -0,0 +1,5 @@
webtest
pytest
flake8
jinja2

View file

@ -1,15 +1,23 @@
import hashlib import hashlib
import json import json
import os
import sys
import time import time
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import Dict, List, Union from traceback import format_exception
from typing import Dict, List, Tuple, Union
from urllib.parse import parse_qs from urllib.parse import parse_qs
from jinja2 import Environment, PackageLoader, select_autoescape from jinja2 import Environment, select_autoescape
# Those fields are required in the user input. They can either DATASET_ROOT_KEY = "de.inm7.sfb1451.entry.dataset_root"
HOME_KEY = "de.inm7.sfb1451.entry.home"
TEMPLATE_DIRECTORY_KEY = "de.inm7.sfb1451.entry.templates"
# The following fields are required in the user input. They can either
# come from the posted data or from the auto_fields-array. # come from the posted data or from the auto_fields-array.
required_fields = [ required_fields = [
"form-data-version", "form-data-version",
@ -99,6 +107,7 @@ required_fields = [
] ]
# Fields that are required, if subject-group == "patient" is True
required_patient_fields = [ required_patient_fields = [
"patient-year-first-symptom", "patient-year-first-symptom",
"patient-month-first-symptom", "patient-month-first-symptom",
@ -263,7 +272,10 @@ def add_file_to_dataset(dataset_root: Path, file: Path, home: Path):
str(file) str(file)
], ],
check=True, check=True,
env={"HOME": str(home)}) env={
**os.environ,
"HOME": str(home)
})
return subprocess.run( return subprocess.run(
[ [
@ -325,7 +337,10 @@ def date_message(year, month, day):
]) ])
def create_result_page(commit_hash: str, time_stamp: float, json_top_data: dict, templates_directory: Path): def create_result_page(commit_hash: str,
time_stamp: float,
json_top_data: dict,
templates_directory: Path):
jinja_template_path = templates_directory / "success.html.jinja2" jinja_template_path = templates_directory / "success.html.jinja2"
jinja_template = Environment(autoescape=select_autoescape()).from_string(jinja_template_path.read_text()) jinja_template = Environment(autoescape=select_autoescape()).from_string(jinja_template_path.read_text())
@ -345,7 +360,7 @@ def get_string_content(_: str, field_content: List[str]) -> str:
return field_content[0] return field_content[0]
def get_checkbox_content(field_name: str, field_content: List[str]) -> str: def get_checkbox_content(_: str, field_content: List[str]) -> str:
return { return {
"": "", "": "",
"off": "False", "off": "False",
@ -597,132 +612,169 @@ def get_canonic_content_string(field_set: Dict[str, List[str]]) -> str:
return ";".join(field_strings) return ";".join(field_strings)
def encode_result_strings(result_strings: List[str]) -> List[bytes]:
return [element.encode("utf-8") for element in result_strings]
def add_auto_fields(existing_fields: dict):
"""Add auto fields to existing_fields, if they are not already present"""
for key, value in auto_fields.items():
if key not in existing_fields:
existing_fields[key] = value
def read_mandatory_fields(mandatory_fields: List[str],
input_data: Dict
) -> Tuple[Dict[str, str], List[str]]:
resulting_data = dict()
missing_keys = []
for key in mandatory_fields:
if key not in input_data:
missing_keys.append(key)
else:
resulting_data[key] = get_field_value(input_data, key)
return resulting_data, missing_keys
def create_bad_request_result(lines: List[str]):
return (
"400 BAD REQUEST",
"text/plain; charset=utf-8",
encode_result_strings(lines))
def create_missing_key_result(missing_keys: List[str]):
return create_bad_request_result([
"The following keys are missing from the request:\n",
"\n".join(missing_keys),
"\n"])
def application(environ, start_response): def application(environ, start_response):
dataset_root = Path(environ["de.inm7.sfb1451.entry.dataset_root"]) try:
home = Path(environ["de.inm7.sfb1451.entry.home"]) request_body_size = int(environ.get("CONTENT_LENGTH", 0))
template_directory = Path(environ["de.inm7.sfb1451.entry.templates"]) request_body = environ["wsgi.input"].read(request_body_size).decode("utf-8")
except (ValueError, KeyError):
request_body_size = 0
request_body = ""
try:
status, content_type, content = protected_application(environ, request_body)
except:
status = "500 INTERNAL ERROR"
content_type = "text/plain; charset=utf-8"
content_strings = [
"An unexpected error occured during processing. If this error\n",
"persists, please send an email with the following information\n",
"to <c.moench@fz-juelich.de> or <m.szczepanik@fz-juelich.de>:\n",
"\n",
"--------\n",
"1. Stacktrace:\n",
"".join(format_exception(*sys.exc_info())),
"2. Environment:\n",
str(environ),
"\n",
f"3. WSGI input data ({request_body_size}):\n",
request_body,
"\n",
"--------\n"
]
content = encode_result_strings(content_strings)
content_length = sum([len(line) for line in content])
response_headers = [
('Content-type', content_type),
('Content-Length', str(content_length))]
start_response(status, response_headers)
return content
def protected_application(environ, request_body):
request_method = environ["REQUEST_METHOD"] request_method = environ["REQUEST_METHOD"]
if request_method == "POST": if request_method != "POST":
try: return create_bad_request_result(["Only POST is supported\n"])
request_body_size = int(environ.get("CONTENT_LENGTH", 0))
except ValueError:
request_body_size = 0
environment = [f"{key}: {value}" for key, value in environ.items()] dataset_root = Path(environ[DATASET_ROOT_KEY])
home = Path(environ[HOME_KEY])
template_directory = Path(environ[TEMPLATE_DIRECTORY_KEY])
request_body = environ["wsgi.input"].read(request_body_size).decode("utf-8") # Parse data and check value structure
entered_data = parse_qs(request_body) sent_data = parse_qs(request_body)
for value in sent_data.values():
if not isinstance(value, list) or not len(value) == 1:
raise ValueError(f"expected list of length one, got: {repr(value)}")
posted_data_string = "\n".join( # Add auto fields to the sent data
[f"{key}: {value}" for key, value in entered_data.items()]) add_auto_fields(sent_data)
# Check single results # Correct the optional checkbox field in the sent data
for value in entered_data.values(): correct_optional_checkbox_fields(sent_data)
assert isinstance(value, list)
assert len(value) == 1
# Add auto fields to the entered data, if they are not already present # Read the mandatory keys into the result dictionary
for key, value in auto_fields.items(): entered_data_object, missing_keys = read_mandatory_fields(
if key not in entered_data: required_fields,
entered_data[key] = value sent_data)
# Correct the optional checkbox fields if missing_keys:
correct_optional_checkbox_fields(entered_data) return create_missing_key_result(missing_keys)
# Check the hash value if entered_data_object["subject-group"] == "patient":
local_hash_string = get_canonic_content_string(entered_data) entered_patient_data, missing_patient_keys = read_mandatory_fields(
if local_hash_string != entered_data["hashed-string"][0]: required_patient_fields,
status = "400 BAD REQUEST" sent_data)
output = [
"Local hash input-string does not match submitted values\n".encode("utf-8"),
("LOCAL: " + local_hash_string + "\n").encode(),
("SENT: " + entered_data["hashed-string"][0] + "\n").encode()]
output_length = sum([len(line) for line in output])
response_headers = [('Content-type', 'text/plain; charset=utf-8'),
('Content-Length', str(output_length))]
start_response(status, response_headers)
return output
local_hash_value = hashlib.sha256(local_hash_string.encode()).hexdigest() entered_data_object.update(entered_patient_data)
if local_hash_value != entered_data["hash-value"][0]: missing_keys.extend(missing_patient_keys)
status = "400 BAD REQUEST"
output = ["Server side hash value does not match submitted hash value".encode("utf-8")]
output_length = sum([len(line) for line in output])
response_headers = [('Content-type', 'text/plain; charset=utf-8'),
('Content-Length', str(output_length))]
start_response(status, response_headers)
return output
# Create posted data dictionary if missing_keys:
json_object = dict() return create_missing_key_result(missing_keys)
# Read the mandatory keys # Check the hash value
for key in required_fields: local_hash_string = get_canonic_content_string(sent_data)
# This will throw an error, if the key is not available if local_hash_string != sent_data["hashed-string"][0]:
json_object[key] = get_field_value(entered_data, key) return create_bad_request_result([
"Local hash input-string does not match submitted values\n",
"LOCAL: " + local_hash_string + "\n",
"SENT: " + sent_data["hashed-string"][0] + "\n"])
# Read keys dependent on subject-group local_hash_value = hashlib.sha256(local_hash_string.encode()).hexdigest()
if json_object["subject-group"] == "patient": if local_hash_value != sent_data["hash-value"][0]:
for key in required_patient_fields: return create_bad_request_result([
# This will throw an error, if the key is not available "Server side hash value does not match submitted hash value\n"])
json_object[key] = get_field_value(entered_data, key)
time_stamp = time.time() time_stamp = time.time()
json_data = { result_object = {
"source": { "source": {
"time_stamp": time_stamp, "time_stamp": time_stamp,
"version": entered_data["form-data-version"][0], "version": sent_data["form-data-version"][0],
"remote_address": environ["REMOTE_ADDR"], "remote_address": environ["REMOTE_ADDR"],
"hashed-string": entered_data["hashed-string"][0], "hashed-string": sent_data["hashed-string"][0],
"hash-value": entered_data["hash-value"][0], "hash-value": sent_data["hash-value"][0],
"signature-data": ( "signature-data": (
None None
if entered_data["signature-data"][0] == "" if sent_data["signature-data"][0] == ""
else entered_data["signature-data"][0] else sent_data["signature-data"][0]
) )
}, },
"data": json_object "data": entered_data_object
} }
directory = dataset_root / "input" / json_data["source"]["version"] directory = dataset_root / "input" / result_object["source"]["version"]
directory.mkdir(parents=True, exist_ok=True) directory.mkdir(parents=True, exist_ok=True)
output_file = directory / (str(time_stamp) + ".json") output_file = directory / (str(time_stamp) + ".json")
with output_file.open("x") as f: with output_file.open("x") as f:
json.dump(json_data, f) json.dump(result_object, f)
commit_hash = add_file_to_dataset(dataset_root, directory / output_file, home) commit_hash = add_file_to_dataset(dataset_root, directory / output_file, home)
result_message = create_result_page(commit_hash, time_stamp, json_data, template_directory) result_message = create_result_page(commit_hash, time_stamp, result_object, template_directory)
return (
status = "200 OK" "200 OK",
"text/html; charset=utf-8",
output = [ encode_result_strings([result_message]))
result_message.encode(),
#"2-------------\n".encode(),
#("\n".join(environment) + "\n").encode("utf-8"),
#"3-------------\n".encode(),
#(posted_data_string + "\n").encode("utf-8"),
#"4-------------\n".encode(),
#json.dumps(json_data, indent=4).encode(),
#"5-------------\n".encode(),
#(local_hash_string + "\n").encode(),
#(entered_data["hashed-string"][0] + "\n").encode(),
#(hashlib.sha256(local_hash_string.encode()).hexdigest() + "\n").encode(),
#f"==: {local_hash_string == entered_data['hashed-string'][0]}\n".encode()
]
else:
status = "400 BAD REQUEST"
output = ["Only post method allowed".encode("utf-8")]
output_length = sum([len(line) for line in output])
response_headers = [('Content-type', 'text/html; charset=utf-8'),
('Content-Length', str(output_length))]
start_response(status, response_headers)
return output

View file

@ -0,0 +1,118 @@
import json
import os
import subprocess
import sys
import unittest
import tempfile
from pathlib import Path
from typing import List
from unittest.mock import patch
from webtest import TestApp
from webtest.app import AppError
server_dir = Path(__file__).parents[1]
template_dir = Path(__file__).parents[2] / "templates"
sys.path.insert(0, str(server_dir))
import store_data
from store_data import DATASET_ROOT_KEY, HOME_KEY, TEMPLATE_DIRECTORY_KEY
minimal_form_data = """form-data-version=2.2&data-entry-domain=de.sfb1451.z03&data-entry-employee=cm-test&project-code=b4&subject-pseudonym=test-111&date-of-birth=2000-01-01&sex=male&date-of-test=2020-01-01&subject-group=healthy&patient-year-first-symptom=&patient-month-first-symptom=&patient-day-first-symptom=&patient-year-diagnosis=&patient-month-diagnosis=&patient-day-diagnosis=&additional-remarks=&hashed-string=form-data-version%3A2.2%3Bdata-entry-domain%3Ade.sfb1451.z03%3Bdata-entry-employee%3Acm-test%3Bproject-code%3Ab4%3Bsubject-pseudonym%3Atest-111%3Bdate-of-birth%3A2000-01-01%3Bsex%3Amale%3Bdate-of-test%3A2020-01-01%3Brepeated-test%3AFalse%3Bpatient-year-first-symptom%3A%3Bpatient-month-first-symptom%3A%3Bpatient-day-first-symptom%3A%3Bpatient-year-diagnosis%3A%3Bpatient-month-diagnosis%3A%3Bpatient-day-diagnosis%3A%3Bpatient-main-disease%3A%3Bpatient-stronger-impacted-hand%3A%3Blaterality-quotient%3A%3Bmaximum-ftf-left%3A%3Bmaximum-ftf-right%3A%3Bmaximum-gs-left%3A%3Bmaximum-gs-right%3A%3Bpurdue-pegboard-left%3A%3Bpurdue-pegboard-right%3A%3Bturn-cards-left%3A%3Bturn-cards-right%3A%3Bsmall-things-left%3A%3Bsmall-things-right%3A%3Bsimulated-feeding-left%3A%3Bsimulated-feeding-right%3A%3Bcheckers-left%3A%3Bcheckers-right%3A%3Blarge-light-things-left%3A%3Blarge-light-things-right%3A%3Blarge-heavy-things-left%3A%3Blarge-heavy-things-right%3A%3Bjtt-incorrectly-executed%3A%3Barat-left%3A%3Barat-right%3A%3Btug-executed%3A%3Btug-a-incorrectly-executed%3A%3Btug-a-tools-required%3A%3Btug-imagined%3A%3Bgo-nogo-block-count%3A%3Bgo-nogo-total-errors%3A%3Bgo-nogo-wrong-errors%3A%3Bgo-nogo-recognized-errors%3A%3Bgo-nogo-correct-answer-time%3A%3Bgo-nogo-recognized-error-time%3A%3Bgo-nogo-incorrectly-executed%3A%3Bkas-pantomime-bukko-facial%3A%3Bkas-pantomime-arm-hand%3A%3Bkas-imitation-bukko-facial%3A%3Bkas-imitation-arm-hand%3A%3Bkopss-orientation%3A%3Bkopss-speech%3A%3Bkopss-praxie%3A%3Bkopss-visual-spatial-performance%3A%3Bkopss-calculating%3A%3Bkopss-executive-performance%3A%3Bkopss-memory%3A%3Bkopss-affect%3A%3Bkopss-behavior-observation%3A%3Bacl-k-loud-reading%3A%3Bacl-k-color-form-test%3A%3Bacl-k-supermarket-task%3A%3Bacl-k-communication-ability%3A%3Bbdi-ii-score%3A%3Bmadrs-score%3A%3Bdemtect-wordlist%3A%3Bdemtect-convert-numbers%3A%3Bdemtect-supermarket-task%3A%3Bdemtect-numbers-reverse%3A%3Bdemtect-wordlist-recall%3A%3Btime-tmt-a%3A%3Btmt-a-incorrectly-executed%3A%3Btime-tmt-b%3A%3Btmt-b-incorrectly-executed%3A%3Bmrs-score%3A%3Beuroqol-code%3A%3Beuroqol-vas%3A%3Bisced-value%3A%3Badditional-mrt-url%3A%3Badditional-mrt-resting-state%3A%3Badditional-mrt-tapping-task%3A%3Badditional-mrt-anatomical-representation%3A%3Badditional-mrt-dti%3A%3Badditional-eeg-url%3A%3Badditional-blood-sampling-url%3A%3Badditional-remarks%3A&hash-value=3bb998f5acf11ad82a17b3cef2c14258712a3b50c5efe7261e7792158e058ebe&signature-data="""
class TestFileTree(unittest.TestCase):
def _test_exception_caught(self,
app_tester,
params,
extra_environ,
patterns: List[str]):
with self.assertRaises(AppError) as context_manager:
app_tester.post(
url="/store-data",
params=params,
extra_environ=extra_environ)
error_message = context_manager.exception.args[0]
print(error_message)
for pattern in patterns:
self.assertTrue(error_message.find(pattern) > 0)
def test_missing_environ_catching(self):
app_tester = TestApp(store_data.application)
self._test_exception_caught(
app_tester=app_tester,
params="Hello",
extra_environ={},
patterns=[
"<c.moench@fz-juelich.de>",
f"KeyError: '{DATASET_ROOT_KEY}'"])
def test_missing_data_catching(self):
app_tester = TestApp(store_data.application)
self._test_exception_caught(
app_tester=app_tester,
params="Hello",
extra_environ={
DATASET_ROOT_KEY: "",
HOME_KEY: "",
TEMPLATE_DIRECTORY_KEY: ""
},
patterns=[
"keys are missing",
"project-code"])
def test_data_storage(self):
app_tester = TestApp(store_data.application)
with tempfile.TemporaryDirectory() as temp_dir:
with \
patch("store_data.add_file_to_dataset") as add_file_mock, \
patch("time.time") as time_mock:
add_file_mock.return_value = 0
time_mock.return_value = 0.0
app_tester.post(
url="/store-data",
params=minimal_form_data,
extra_environ={
DATASET_ROOT_KEY: temp_dir,
HOME_KEY: os.environ["HOME"],
TEMPLATE_DIRECTORY_KEY: str(template_dir),
"REMOTE_ADDR": "1.2.3.4"
})
expected_path = Path(temp_dir) / "input/2.2/0.0.json"
with expected_path.open() as f:
json_object = json.load(f)
print(json_object)
def test_datalad_saving(self):
app_tester = TestApp(store_data.application)
with tempfile.TemporaryDirectory() as temp_dir:
dataset_path = Path(temp_dir) / "dataset"
subprocess.run(["datalad", "create", "-c", "text2git", str(dataset_path)])
subprocess.run(["datalad", "no-annex", "-d", str(dataset_path)])
with patch("time.time") as time_mock:
time_mock.return_value = 0.0
app_tester.post(
url="/store-data",
params=minimal_form_data,
extra_environ={
DATASET_ROOT_KEY: str(dataset_path),
HOME_KEY: os.environ["HOME"],
TEMPLATE_DIRECTORY_KEY: str(template_dir),
"REMOTE_ADDR": "1.2.3.4"
})
expected_path = dataset_path / "input/2.2/0.0.json"
with expected_path.open() as f:
json_object = json.load(f)
print(json_object)