242 lines
11 KiB
Python
242 lines
11 KiB
Python
"""Monkeypatch for linkml_runtime.loaders.rdflib_loader.RDFLibLoader.from_rdf_graph
|
|
|
|
Corresponds to the patch `patches/rdflib_loader_typedesignator.diff` in the
|
|
`datalad-concepts` repository, i.e.,
|
|
<https://github.com/psychoinformatics-de/datalad-concepts.git>
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from copy import copy
|
|
from dataclasses import dataclass
|
|
from importlib import import_module
|
|
from typing import (
|
|
Any,
|
|
Union,
|
|
)
|
|
|
|
from curies import Converter
|
|
from linkml_runtime import MappingError
|
|
from linkml_runtime.linkml_model import (
|
|
ClassDefinition,
|
|
ClassDefinitionName,
|
|
EnumDefinition,
|
|
SlotDefinition,
|
|
TypeDefinition,
|
|
)
|
|
from linkml_runtime.utils.formatutils import underscore
|
|
from linkml_runtime.utils.schemaview import SchemaView
|
|
from linkml_runtime.utils.yamlutils import YAMLRoot
|
|
from pydantic import BaseModel
|
|
from rdflib import (
|
|
Graph,
|
|
URIRef,
|
|
)
|
|
from rdflib.namespace import RDF
|
|
from rdflib.term import (
|
|
BNode,
|
|
Literal,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
VALID_SUBJECT = Union[URIRef, BNode]
|
|
ANYDICT = dict[str, Any]
|
|
|
|
|
|
@dataclass
|
|
class Pointer:
|
|
obj: str
|
|
|
|
|
|
def from_rdf_graph(
|
|
self,
|
|
graph: Graph,
|
|
schemaview: SchemaView,
|
|
target_class: type[Union[BaseModel, YAMLRoot]],
|
|
prefix_map: Union[dict[str, str], Converter, None] = None,
|
|
cast_literals: bool = True,
|
|
allow_unprocessed_triples: bool = True,
|
|
ignore_unmapped_predicates: bool = False,
|
|
) -> list[Union[BaseModel, YAMLRoot]]:
|
|
"""
|
|
Loads objects from graph into lists of the python target_class structure,
|
|
recursively walking RDF graph from instances of target_class.
|
|
|
|
:param graph: rdflib Graph that holds instances of target_class
|
|
:param schemaview: schema to which graph conforms
|
|
:param target_class: class which root nodes should instantiate
|
|
:param prefix_map: additional prefix mappings for data objects
|
|
:param ignore_unmapped_predicates: if True then a predicate that has no mapping to a slot does not raise an error
|
|
:return: all instances of target class type
|
|
"""
|
|
namespaces = schemaview.namespaces()
|
|
uri_to_class_map = {}
|
|
for cn, c in schemaview.all_classes().items():
|
|
uri = schemaview.get_uri(c, expand=True)
|
|
if uri in uri_to_class_map:
|
|
c2 = uri_to_class_map[uri]
|
|
if c2.name in schemaview.class_ancestors(cn):
|
|
continue
|
|
else:
|
|
logger.error(f'Inconsistent URI to class map: {uri} -> {c2.name}, {c.name}')
|
|
uri_to_class_map[uri] = c
|
|
# data prefix map: supplements or overrides existing schema prefix map
|
|
if isinstance(prefix_map, Converter):
|
|
# TODO replace with `prefix_map = prefix_map.bimap` after making minimum requirement on python 3.8
|
|
prefix_map = {record.prefix: record.uri_prefix for record in prefix_map.records}
|
|
if prefix_map:
|
|
for k, v in prefix_map.items():
|
|
namespaces[k] = v
|
|
graph.namespace_manager.bind(k, URIRef(v))
|
|
# Step 1: Create stub root dict-objects
|
|
target_class_uriref: URIRef = target_class.class_class_uri
|
|
root_dicts: list[ANYDICT] = []
|
|
root_subjects: list[VALID_SUBJECT] = list(graph.subjects(RDF.type, target_class_uriref))
|
|
logger.debug(f'ROOTS = {root_subjects}')
|
|
# Step 2: walk RDF graph starting from root subjects, constructing dict tree
|
|
node_tuples_to_visit: list[tuple[VALID_SUBJECT, ClassDefinitionName]] ## nodes and their type still to visit
|
|
node_tuples_to_visit = [(subject, target_class.class_name) for subject in root_subjects]
|
|
uri_to_slot: dict[str, SlotDefinition] ## lookup table for RDF predicates -> slots
|
|
uri_to_slot = {URIRef(schemaview.get_uri(s, expand=True)): s for s in schemaview.all_slots().values()}
|
|
processed: set[VALID_SUBJECT] = set() ## track nodes already visited, or already scheduled
|
|
for n, _ in node_tuples_to_visit:
|
|
processed.add(n)
|
|
obj_map: dict[VALID_SUBJECT, ANYDICT] = {} ## map from an RDF node to its dict representation
|
|
unmapped_predicates = set()
|
|
processed_triples: set[tuple] = set()
|
|
while len(node_tuples_to_visit) > 0:
|
|
subject, subject_class = node_tuples_to_visit.pop()
|
|
processed.add(subject)
|
|
dict_obj = self._get_id_dict(subject, schemaview, subject_class)
|
|
if subject in root_subjects:
|
|
root_dicts.append(dict_obj)
|
|
obj_map[subject] = dict_obj
|
|
type_designator_slot = schemaview.get_type_designator_slot(subject_class)
|
|
if type_designator_slot:
|
|
td_iri = schemaview.get_uri(type_designator_slot, expand=True)
|
|
type_vals = list(graph.objects(subject, URIRef(td_iri)))
|
|
if len(type_vals) > 0:
|
|
type_classes = [uri_to_class_map[str(x)] for x in type_vals]
|
|
if len(type_classes) > 1:
|
|
raise ValueError(f'Ambiguous types for {subject} == {type_classes}')
|
|
logger.info(f'Replacing {subject_class} with {type_classes}')
|
|
subject_class = type_classes[0].name
|
|
# PATCH >>>>>
|
|
if type_classes[0].class_uri is not None:
|
|
dict_obj[type_designator_slot.name] = type_classes[0].class_uri
|
|
# PATCH <<<<<
|
|
# process all triples for this node
|
|
for (_, p, o) in graph.triples((subject, None, None)):
|
|
processed_triples.add((subject,p,o))
|
|
logger.debug(f' Processing triple {subject} {p} {o}, subject type = {subject_class}')
|
|
if p == RDF.type:
|
|
logger.debug(f'Ignoring RDF.type for {subject} {o}, we automatically infer this from {subject_class}')
|
|
elif p not in uri_to_slot:
|
|
if ignore_unmapped_predicates:
|
|
unmapped_predicates.add(p)
|
|
else:
|
|
raise MappingError(f'No pred for {p} {type(p)}')
|
|
else:
|
|
slot = schemaview.induced_slot(uri_to_slot[p].name, subject_class)
|
|
range_applicable_elements = schemaview.slot_applicable_range_elements(slot)
|
|
is_inlined = schemaview.is_inlined(slot)
|
|
slot_name = underscore(slot.name)
|
|
if isinstance(o, Literal):
|
|
if EnumDefinition.class_name in range_applicable_elements:
|
|
logger.debug(f'Assuming no meaning assigned for value {o} for Enum {slot.range}')
|
|
elif TypeDefinition.class_name not in range_applicable_elements:
|
|
raise ValueError(f'Cannot map Literal {o} to a slot {slot.name} whose range {slot.range} is not a type;')
|
|
v = o.value
|
|
elif isinstance(o, BNode):
|
|
if not is_inlined:
|
|
logger.error(f'blank nodes should be inlined; {slot_name}={o} in {subject}')
|
|
v = Pointer(o)
|
|
else:
|
|
if ClassDefinition.class_name in range_applicable_elements:
|
|
if slot.range in schemaview.all_classes():
|
|
id_slot = schemaview.get_identifier_slot(slot.range)
|
|
v = self._uri_to_id(o, id_slot, schemaview)
|
|
else:
|
|
v = namespaces.curie_for(o)
|
|
if v is None:
|
|
logger.debug(f'No CURIE for {p}={o} in {subject} [{subject_class}]')
|
|
v = str(o)
|
|
elif EnumDefinition.class_name in range_applicable_elements:
|
|
range_union_elements = schemaview.slot_range_as_union(slot)
|
|
enum_names = [e for e in range_union_elements if e in schemaview.all_enums()]
|
|
# if a PV has a meaning URI declared, map this
|
|
# back to a text representation
|
|
v = namespaces.curie_for(o)
|
|
for enum_name in enum_names:
|
|
e = schemaview.get_enum(enum_name)
|
|
for pv in e.permissible_values.values():
|
|
if v == pv.meaning or str(o) == pv.meaning:
|
|
v = pv.text
|
|
break
|
|
elif TypeDefinition.class_name in range_applicable_elements:
|
|
if cast_literals:
|
|
v = namespaces.curie_for(o)
|
|
if v is None:
|
|
v = str(o)
|
|
logger.debug(f'Casting {o} to string')
|
|
else:
|
|
raise ValueError(f'Expected literal value ({range_applicable_elements}) for {slot_name}={o}')
|
|
if is_inlined:
|
|
# the object of the triple may not yet be processed;
|
|
# we store a pointer to o, and then replace this later
|
|
v = Pointer(o)
|
|
if slot.multivalued:
|
|
if slot_name not in dict_obj:
|
|
dict_obj[slot_name] = []
|
|
dict_obj[slot_name].append(v)
|
|
else:
|
|
dict_obj[slot_name] = v
|
|
if o not in processed:
|
|
# if o instantiates a class, add to list of nodes to be visited.
|
|
# force type based on range constraint
|
|
if slot.range in schemaview.all_classes():
|
|
node_tuples_to_visit.append((o, ClassDefinitionName(slot.range)))
|
|
if unmapped_predicates:
|
|
logger.info(f'Unmapped predicated: {unmapped_predicates}')
|
|
unprocessed_triples = set(graph.triples((None, None, None))) - processed_triples
|
|
logger.info(f'Triple processed = {len(processed_triples)}, unprocessed = {len(unprocessed_triples)}')
|
|
if len(unprocessed_triples) > 0:
|
|
if not allow_unprocessed_triples:
|
|
for t in unprocessed_triples:
|
|
logger.warning(f' Unprocessed: {t}')
|
|
raise ValueError(f'Unprocessed triples: {len(unprocessed_triples)}')
|
|
# Step 2: replace inline pointers with object dicts
|
|
def repl(v):
|
|
if isinstance(v, Pointer):
|
|
v2 = obj_map[v.obj]
|
|
if v2 is None:
|
|
raise Exception(f'No mapping for pointer {v}')
|
|
return v2
|
|
else:
|
|
return v
|
|
objs_to_visit: list[ANYDICT] = copy(root_dicts)
|
|
while len(objs_to_visit) > 0:
|
|
obj = objs_to_visit.pop()
|
|
logger.debug(f'Replacing pointers for {obj}')
|
|
for k, v in obj.items():
|
|
if v is None:
|
|
continue
|
|
if isinstance(v, list):
|
|
v = [repl(v1) for v1 in v if v1 is not None]
|
|
for v1 in v:
|
|
if isinstance(v1, dict):
|
|
objs_to_visit.append(v1)
|
|
else:
|
|
v = repl(v)
|
|
if isinstance(v, dict):
|
|
objs_to_visit.append(v)
|
|
obj[k] = v
|
|
# Final step: translate dicts into instances of target_class
|
|
return [target_class(**x) for x in root_dicts]
|
|
|
|
|
|
mod = import_module('linkml_runtime.loaders.rdflib_loader')
|
|
mod.RDFLibLoader.from_rdf_graph = from_rdf_graph
|