The test-fixture `fast_api_simple` now returns a tuple containing: - test_client instance - store path - admin token
142 lines
4.6 KiB
Python
142 lines
4.6 KiB
Python
import freezegun
|
|
import pytest # noqa F401
|
|
|
|
from .. import HTTP_200_OK
|
|
from ..utils import cleaned_json
|
|
|
|
json_record = {'schema_type': 'abc:Person', 'pid': 'xyz:bbbb', 'given_name': 'John'}
|
|
json_record_out = {'schema_type': 'abc:Person', **json_record}
|
|
new_ttl_pid = 'xyz:cccc'
|
|
|
|
ttl_record = """@prefix abc: <http://example.org/person-schema/abc/> .
|
|
@prefix oxo: <http://purl.obolibrary.org/obo/> .
|
|
@prefix xyz: <http://example.org/person-schema/xyz/> .
|
|
|
|
xyz:HenryAdams a abc:Person ;
|
|
abc:annotations [ a abc:Annotation ;
|
|
abc:annotation_tag oxo:NCIT_C54269 ;
|
|
abc:annotation_value "test_user_1" ] ;
|
|
abc:given_name "Henryöäß" ;
|
|
abc:schema_type "abc:Person" .
|
|
"""
|
|
|
|
ttl_result_record_a = """@prefix abc: <http://example.org/person-schema/abc/> .
|
|
@prefix oxo: <http://purl.obolibrary.org/obo/> .
|
|
@prefix xyz: <http://example.org/person-schema/xyz/> .
|
|
|
|
xyz:HenryAdams a abc:Person ;
|
|
abc:annotations [ a abc:Annotation ;
|
|
abc:annotation_tag <http://semanticscience.org/resource/SIO_001083> ;
|
|
abc:annotation_value "1970-01-01T00:00:00" ],
|
|
[ a abc:Annotation ;
|
|
abc:annotation_tag oxo:NCIT_C54269 ;
|
|
abc:annotation_value "test_user_1" ] ;
|
|
abc:given_name "Henryöäß" ;
|
|
abc:schema_type "abc:Person" .
|
|
"""
|
|
|
|
ttl_result_record_b = """@prefix abc: <http://example.org/person-schema/abc/> .
|
|
@prefix oxo: <http://purl.obolibrary.org/obo/> .
|
|
@prefix xyz: <http://example.org/person-schema/xyz/> .
|
|
|
|
xyz:HenryAdams a abc:Person ;
|
|
abc:annotations [ a abc:Annotation ;
|
|
abc:annotation_tag oxo:NCIT_C54269 ;
|
|
abc:annotation_value "test_user_1" ],
|
|
[ a abc:Annotation ;
|
|
abc:annotation_tag <http://semanticscience.org/resource/SIO_001083> ;
|
|
abc:annotation_value "1970-01-01T00:00:00" ] ;
|
|
abc:given_name "Henryöäß" ;
|
|
abc:schema_type "abc:Person" .
|
|
"""
|
|
|
|
new_json_pid = 'xyz:HenryBaites'
|
|
|
|
|
|
def test_json_ttl_json(fastapi_client_simple):
|
|
test_client, _, _ = fastapi_client_simple
|
|
|
|
# Deposit JSON records
|
|
response = test_client.post(
|
|
'/collection_1/record/Person',
|
|
headers={'x-dumpthings-token': 'token-1'},
|
|
json=json_record,
|
|
)
|
|
assert response.status_code == HTTP_200_OK
|
|
|
|
# Retrieve TTL records
|
|
response = test_client.get(
|
|
f'/collection_1/record?pid={json_record["pid"]}&format=ttl',
|
|
headers={'x-dumpthings-token': 'token-1'},
|
|
)
|
|
assert response.status_code == HTTP_200_OK
|
|
ttl = response.text
|
|
|
|
# modify the pid
|
|
ttl = ttl.replace(json_record['pid'], new_ttl_pid)
|
|
|
|
response = test_client.post(
|
|
'/collection_1/record/Person?format=ttl',
|
|
headers={'content-type': 'text/turtle', 'x-dumpthings-token': 'token-1'},
|
|
data=ttl,
|
|
)
|
|
assert response.status_code == HTTP_200_OK
|
|
|
|
# Retrieve JSON record
|
|
response = test_client.get(
|
|
f'/collection_1/record?pid={new_ttl_pid}&format=json',
|
|
headers={'x-dumpthings-token': 'token-1'},
|
|
)
|
|
assert response.status_code == HTTP_200_OK
|
|
json_object = cleaned_json(response.json(), remove_keys=('annotations',))
|
|
assert json_object != json_record_out
|
|
json_object['pid'] = json_record_out['pid']
|
|
assert json_object == json_record_out
|
|
|
|
|
|
@freezegun.freeze_time('1970-01-01')
|
|
def test_ttl_json_ttl(fastapi_client_simple):
|
|
test_client, _, _ = fastapi_client_simple
|
|
|
|
# Deposit a ttl record
|
|
response = test_client.post(
|
|
'/collection_1/record/Person?format=ttl',
|
|
headers={
|
|
'x-dumpthings-token': 'token-1',
|
|
'content-type': 'text/turtle',
|
|
},
|
|
data=ttl_record,
|
|
)
|
|
assert response.status_code == HTTP_200_OK
|
|
|
|
# Retrieve JSON records
|
|
response = test_client.get(
|
|
'/collection_1/record?pid=xyz:HenryAdams&format=json',
|
|
headers={'x-dumpthings-token': 'token-1'},
|
|
)
|
|
assert response.status_code == HTTP_200_OK
|
|
json_object = response.json()
|
|
|
|
# modify the pid
|
|
json_object['pid'] = new_json_pid
|
|
|
|
response = test_client.post(
|
|
'/collection_1/record/Person?format=json',
|
|
headers={'x-dumpthings-token': 'token-1'},
|
|
json=json_object,
|
|
)
|
|
assert response.status_code == HTTP_200_OK
|
|
|
|
# Retrieve ttl record
|
|
response = test_client.get(
|
|
f'/collection_1/record?pid={new_json_pid}&format=ttl',
|
|
headers={'x-dumpthings-token': 'token-1'},
|
|
)
|
|
assert response.status_code == HTTP_200_OK
|
|
assert (
|
|
response.text.strip()
|
|
== ttl_result_record_a.replace('xyz:HenryAdams', new_json_pid).strip()
|
|
) or (
|
|
response.text.strip()
|
|
== ttl_result_record_b.replace('xyz:HenryAdams', new_json_pid).strip()
|
|
)
|