436 lines
15 KiB
Python
436 lines
15 KiB
Python
# /// script
|
|
# requires-python = ">=3.12"
|
|
# dependencies = [
|
|
# "bidict",
|
|
# "rich-click",
|
|
# "lxml",
|
|
# "requests_cache",
|
|
# ]
|
|
# ///
|
|
|
|
import json
|
|
from urllib.parse import urljoin
|
|
from pathlib import Path
|
|
import re
|
|
import warnings
|
|
|
|
from bidict import bidict
|
|
from lxml import html
|
|
from requests_cache import CachedSession
|
|
import rich_click as click
|
|
|
|
|
|
def consult_rules(license_uri: str, rules: list[dict]) -> str | None:
|
|
"""Match a license URI against Rule records
|
|
|
|
This function tries to match the given license url against PIDs
|
|
(expanded from curie to uri using hardcoded prefixes) or exact
|
|
mappings.
|
|
|
|
"""
|
|
|
|
pmap = {
|
|
"obo": "http://purl.obolibrary.org/obo/",
|
|
"spdxlic": "https://spdx.org/licenses/",
|
|
}
|
|
|
|
for rule in rules:
|
|
# this assumes pid needs expansion but exact mappings do not
|
|
# this ignores trailing / in exact mappings
|
|
identifiers = [
|
|
expand_curie(rule["pid"], pmap),
|
|
*[x.rstrip("/") for x in rule.get("exact_mappings", [])],
|
|
]
|
|
if license_uri.rstrip("/") in identifiers:
|
|
return rule["pid"]
|
|
|
|
|
|
def csl_abstract(d: dict) -> str | None:
|
|
"""Get abstract from csl
|
|
|
|
Some abstracts seen in the wild are marked up with jats tags, and
|
|
the top-level may include (a combination of) sections, titles and
|
|
paragraphs (usually, a section itself contains a title and one
|
|
paragraph). We can use the paragraphs, and mix in the section
|
|
titles. Otherwise, remove all tags (return text content).
|
|
|
|
"""
|
|
if abstract := d.get("abstract", False):
|
|
h = html.fromstring(abstract)
|
|
if {x.tag for x in h} <= {"jats:p", "jats:title", "jats:sec"}:
|
|
return jats2md(h)
|
|
else:
|
|
return h.text_content()
|
|
else:
|
|
return None
|
|
|
|
|
|
def csl_license(d: dict) -> list:
|
|
"""Get license from doi content-negotiation json"""
|
|
license_urls = []
|
|
for license in d.get("license", []):
|
|
if license["content-version"] == "vor":
|
|
# "version of record"
|
|
license_urls.append(license["URL"])
|
|
# deduplicate before returning, just in case
|
|
return list(set(license_urls))
|
|
|
|
|
|
def csl_publish_date(d: dict, allow_incomplete: bool = True) -> str | None:
|
|
"""Get one publication date out of csl"""
|
|
if "issued" in d:
|
|
date = d["issued"]["date-parts"]
|
|
elif "published-online" in d:
|
|
date = d["published-online"]["date-parts"]
|
|
else:
|
|
return None
|
|
|
|
# partial date, a nested array of numbers
|
|
if len(date[0]) == 1 or (len(date[0]) < 3 and not allow_incomplete):
|
|
isodate = f"{date[0][0]}" # yyyy (only year is required)
|
|
elif len(date[0]) == 2:
|
|
isodate = f"{date[0][0]}-{date[0][1]:02}" # yyyy-mm
|
|
else:
|
|
isodate = f"{date[0][0]}-{date[0][1]:02}-{date[0][2]:02}" # yyyy-mm-dd
|
|
|
|
return isodate
|
|
|
|
|
|
def discover_authors(
|
|
publication: dict, known_people: bidict[str, str], citeproc_record: dict
|
|
) -> list[dict]:
|
|
|
|
missing_attributions = []
|
|
|
|
# check which contributors with orcids are already declared
|
|
declared_contributor_orcids = set()
|
|
for attribution in publication.get("attributed_to", []):
|
|
if (orcid := known_people.get(attribution.get("object"))) is not None:
|
|
declared_contributor_orcids.add(orcid)
|
|
|
|
# compare to contributors with orcids in the citeproc record
|
|
for author in citeproc_record.get("author", []):
|
|
if (
|
|
(orcid := author.get("ORCID")) is not None
|
|
and orcid in known_people.values()
|
|
and orcid not in declared_contributor_orcids
|
|
):
|
|
if author.get("sequence") == "first":
|
|
r = "obo:MS_1002034" # first author
|
|
elif author.get("sequence") == "additional":
|
|
r = "obo:MS_1002036" # co-author
|
|
else:
|
|
r = "marcrel:aut"
|
|
missing_attributions.append(
|
|
{"object": known_people.inverse[orcid], "roles": [r]}
|
|
)
|
|
|
|
return missing_attributions
|
|
|
|
|
|
def expand_curie(curie: str, pmap: dict[str, str]) -> str:
|
|
"""Expand curie to uri using a prefix map
|
|
|
|
If there is no prefix or the prefix is not defined in the prefix
|
|
map, returns the input value. This is a simple helper. For more
|
|
complex usecases, consider using the external curies package.
|
|
|
|
"""
|
|
pat = re.compile(r"(?P<prefix>\w+):(?P<reference>.*)")
|
|
if (m := re.match(pat, curie)) is not None and m["prefix"] in pmap:
|
|
return pmap[m["prefix"]] + m["reference"]
|
|
return curie
|
|
|
|
|
|
def jats2md(span: html.HtmlElement, rstrip: bool = True) -> str:
|
|
full_text = ""
|
|
for elem in span:
|
|
if elem.tag == "jats:title":
|
|
if elem.text.lower() != "abstract":
|
|
# we know an abstract is an abstract
|
|
full_text += elem.text_content()
|
|
full_text += ": " if not elem.text_content().endswith(".") else " "
|
|
elif elem.tag == "jats:p":
|
|
this_text = elem.text_content()
|
|
for sub in elem:
|
|
if sub.tag == "jats:ext-link":
|
|
# wrap at least plain links for unambiguous parsing by hugo
|
|
if (href := sub.get("xlink:href")) == sub.text_content():
|
|
this_text = this_text.replace(href, f"<{href}>")
|
|
full_text += this_text
|
|
full_text += "\n\n"
|
|
elif elem.tag == "jats:sec":
|
|
full_text += jats2md(elem, rstrip=False)
|
|
else:
|
|
full_text += elem.text_content()
|
|
return full_text.rstrip() if rstrip else full_text
|
|
|
|
|
|
def pid_of(x: str | dict) -> str:
|
|
"""Return a PID of an object, inlined or not
|
|
|
|
A shortcut - makes a pid string or an inlined dict (where pid is a
|
|
property) equivalent. Does not do further validation, but it could
|
|
be added here.
|
|
|
|
"""
|
|
return x.get("pid", "") if isinstance(x, dict) else x
|
|
|
|
|
|
def process_doi(paper: dict) -> str | None:
|
|
"""Return a DOI from identifiers"""
|
|
|
|
for identifier in paper.get("identifiers", []):
|
|
if (
|
|
pid_of(identifier.get("creator")) == "ror:01fyxcz70"
|
|
or identifier.get("schema_type") == "dlthings:DOI"
|
|
):
|
|
return identifier.get("notation")
|
|
|
|
|
|
def process_orcid(person: dict) -> str | None:
|
|
"""Return an ORCID from identifiers"""
|
|
|
|
for identifier in person.get("identifiers", []):
|
|
if pid_of(identifier.get("creator")) == "ror:04fa4r544":
|
|
return identifier.get("notation")
|
|
|
|
|
|
def publishing_process(d: dict) -> dict[str, str] | None:
|
|
res = {"object": "obo:IAO_0000444"}
|
|
has_detail = False
|
|
|
|
if (pubdate := csl_publish_date(d)) is not None:
|
|
has_detail = True
|
|
res["at_time"] = pubdate
|
|
|
|
if (issn := d.get("ISSN")) is not None:
|
|
has_detail = True
|
|
# there can be more than one (e.g. different for print / online)
|
|
# if that's the case, use the 1st - we have no more data at hand
|
|
res["at_location"] = f"ISSN:{issn[0]}"
|
|
|
|
return res if has_detail else None
|
|
|
|
|
|
def query_doi_citation(session: CachedSession, doi: str) -> str | None:
|
|
doi_url = urljoin("https://doi.org/", doi)
|
|
r = session.get(doi_url, headers={"Accept": "text/x-bibliography; style=apa"})
|
|
if r.ok and (r.encoding != r.apparent_encoding == "utf-8"):
|
|
# if it appears like utf-8, it likely is utf-8
|
|
# see https://stackoverflow.com/questions/44203397/
|
|
r.encoding = r.apparent_encoding
|
|
return r.text if r.ok else None
|
|
|
|
|
|
def query_doi_csl(session: CachedSession, doi: str) -> dict | None:
|
|
doi_url = urljoin("https://doi.org", doi)
|
|
r = session.get(
|
|
doi_url, headers={"Accept": "application/vnd.citationstyles.csl+json"}
|
|
)
|
|
return r.json() if r.ok else None
|
|
|
|
|
|
def remap_person_records(records: list[dict]) -> bidict[str, str]:
|
|
"""Create a bidirectional mapping of PIDs and ORCIDs"""
|
|
my_map = bidict()
|
|
for record in records:
|
|
if (orcid := process_orcid(record)) is not None:
|
|
my_map[record["pid"]] = f"https://orcid.org/{orcid}"
|
|
return my_map
|
|
|
|
|
|
def rules_from_citeproc(citeproc_record: dict, known_rules: list[dict]) -> list[str]:
|
|
res = []
|
|
for url in csl_license(citeproc_record):
|
|
if (license_pid := consult_rules(url, known_rules)) is not None:
|
|
res.append(license_pid)
|
|
return sorted(res)
|
|
|
|
|
|
def short_name_from_citeproc(d: dict) -> str | None:
|
|
"""Generate file name based on citeproc data
|
|
|
|
Combines last name of the first author, (short) container title,
|
|
and date to form something that is human-readable and likely
|
|
unique enough.
|
|
|
|
Required properties are usually present, but they are not
|
|
required, so we proceed only if we find all three.
|
|
|
|
"""
|
|
|
|
if not (
|
|
"author" in d
|
|
and ("container-title-short" in d or "container-title" in d)
|
|
and "issued" in d
|
|
):
|
|
return None
|
|
|
|
# first author (et al)
|
|
author = d["author"]
|
|
if len(author) == 1:
|
|
# family is required (at least in crossref) - define default to be safe
|
|
author_part = author[0].get("family", "unknown")
|
|
else:
|
|
author_part = author[0].get("family", "unknown") + "_etal"
|
|
|
|
# journal title (abbreviated)
|
|
if container := d.get("container-title-short", False):
|
|
journal_part = container.replace(" ", "_")
|
|
elif ((container := d.get("container-title")) is not None) and container != []:
|
|
# todo: iso4?
|
|
journal_part = container.replace(" ", "_")
|
|
else:
|
|
# none of those are mandatory
|
|
journal_part = d.get("group-title", "")
|
|
institution = d.get("institution", [{}])[0].get("name")
|
|
if institution == "bioRxiv":
|
|
# "biorxiv-neuroscience" over "neuroscience"
|
|
journal_part = institution + "-" + "journal_part"
|
|
if journal_part == "":
|
|
journal_part = "unknown"
|
|
journal_part = re.sub(r"[^\w]", "", journal_part) # keep alphanumerics
|
|
|
|
date_part = csl_publish_date(d).replace("-", "_") # pyright:ignore
|
|
|
|
return "_".join((author_part, journal_part, date_part)) + ".md"
|
|
|
|
|
|
@click.command()
|
|
@click.argument("input", type=click.File("rb"))
|
|
@click.argument("output", type=click.File("wt"))
|
|
@click.option(
|
|
"--persons",
|
|
type=click.File("rb"),
|
|
help="Person records to discover authors (json lines).",
|
|
)
|
|
@click.option(
|
|
"--rules",
|
|
type=click.File("rb"),
|
|
help="Rule records (json lines) to match licenses (json lines).",
|
|
)
|
|
@click.option(
|
|
"--extras",
|
|
is_flag=True,
|
|
help="Add non-schema-compliant properties (starting with x_).",
|
|
)
|
|
def main(input, output, persons, rules, extras):
|
|
"""Enrich record with metadata fetched via doi.org
|
|
|
|
Reads publication records from INPUT and outputs enriched records
|
|
to OUTPUT. INPUT and OUTPUT should be in JSON lines format, and
|
|
can be files or stdin / stdout (-).
|
|
|
|
With --persons, authors in the retrieved metadata will be
|
|
cross-referenced with the provided Person records based on ORCID,
|
|
and added to contributors (requires ORCID to be present in both
|
|
sources). With --rules, licenses will be translated by checking
|
|
PIDs and exact mappings of the provided Rule records. Both
|
|
arguments can use JSON lines files or stdin (-).
|
|
|
|
Only the properties which are missing are updated (date is the
|
|
exception, updated if more precise one is available).
|
|
|
|
If --extras is specified, the produced record will contain
|
|
properties which are not compatible with the research information
|
|
schema, but can be useful for page generators (x_citation and
|
|
x_suggested_name).
|
|
|
|
Makes requests to doi.org (content negotiation) to fetch metadata
|
|
(and, with --extras, also formatted citation). Uses caching to
|
|
store requests in `$PWD/.cache` (valid for 2 hours).
|
|
|
|
"""
|
|
|
|
session = CachedSession(
|
|
".cache/requests-cache/http_cache",
|
|
backend="sqlite",
|
|
match_headers=["Accept"],
|
|
expire_after=7200,
|
|
)
|
|
|
|
all_people = [json.loads(line) for line in persons] if persons is not None else []
|
|
all_rules = [json.loads(line) for line in rules] if rules is not None else []
|
|
pid_orcid_map = remap_person_records(all_people)
|
|
|
|
for line in input:
|
|
paper = json.loads(line)
|
|
doi = process_doi(paper)
|
|
citeproc_metadata = query_doi_csl(session, doi) if doi is not None else None
|
|
citation_text = (
|
|
query_doi_citation(session, doi) if doi is not None and extras else None
|
|
)
|
|
|
|
if citation_text is not None:
|
|
paper["x_citation"] = citation_text
|
|
|
|
if citeproc_metadata is None:
|
|
# nothing to do, emit unchanged
|
|
click.echo(json.dumps(paper), output)
|
|
continue
|
|
|
|
# contributors
|
|
more_attributions = discover_authors(paper, pid_orcid_map, citeproc_metadata)
|
|
if len(more_attributions) > 0:
|
|
if "attributed_to" not in paper:
|
|
paper["attributed_to"] = more_attributions
|
|
else:
|
|
paper["attributed_to"].extend(more_attributions)
|
|
|
|
# publishing activity (date / ISSN)
|
|
citeproc_pp = publishing_process(citeproc_metadata)
|
|
activities = paper.get("generated_by", [])
|
|
|
|
# find publishing process in publication
|
|
pp_idx = None
|
|
for i in range(len(activities)):
|
|
if pid_of(activities[i].get("object")) == "obo:IAO_0000444": # Publishing process
|
|
pp_idx = i
|
|
break
|
|
|
|
# update publishing activity (date & issn)
|
|
if citeproc_pp is not None:
|
|
if "generated_by" not in paper:
|
|
# no activities so far: add a list
|
|
paper["generated_by"] = [citeproc_pp]
|
|
elif pp_idx is None:
|
|
# activities but no publishing process: append
|
|
paper["generated_by"].append(citeproc_pp)
|
|
else:
|
|
# activities incl. publishing process: merge keeping original values
|
|
paper["generated_by"][pp_idx] = (
|
|
citeproc_pp | paper["generated_by"][pp_idx]
|
|
)
|
|
# override date if is more precise in citeproc
|
|
if len(citeproc_pp.get("at_time", "").split("-")) > len(
|
|
paper["generated_by"][pp_idx].get("at_time", "").split("-")
|
|
):
|
|
paper["generated_by"][pp_idx]["at_time"] = citeproc_pp["at_time"]
|
|
|
|
# title
|
|
if paper.get("title") is None and citeproc_metadata.get("title") is not None:
|
|
paper["title"] = citeproc_metadata.get("title")
|
|
|
|
# abstract
|
|
if (
|
|
paper.get("description") is None
|
|
and (citeproc_abstract := csl_abstract(citeproc_metadata)) is not None
|
|
):
|
|
paper["description"] = citeproc_abstract
|
|
|
|
# rules (licenses)
|
|
if paper.get("rules") is None:
|
|
citeproc_rules = rules_from_citeproc(citeproc_metadata, all_rules)
|
|
if len(citeproc_rules) > 0:
|
|
paper["rules"] = citeproc_rules
|
|
|
|
# suggested output file name
|
|
if extras and (sn := short_name_from_citeproc(citeproc_metadata)) is not None:
|
|
paper["x_suggested_name"] = sn
|
|
|
|
click.echo(json.dumps(paper), output)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|