WIP: Start towards automatic file metadata ingestion from local datasets #19
1 changed files with 267 additions and 0 deletions
267
code/index-dataset-files.py
Normal file
267
code/index-dataset-files.py
Normal file
|
|
@ -0,0 +1,267 @@
|
||||||
|
# /// script
|
||||||
|
# requires-python = ">=3.12"
|
||||||
|
# dependencies = [
|
||||||
|
# "datalad-core @ git+https://hub.datalad.org/datalad/datalad-core@minilad",
|
||||||
|
# "dump-things-pyclient @ https://hub.psychoinformatics.de/datalink/dump-things-pyclient.git",
|
||||||
|
# "git-annex",
|
||||||
|
# "rich",
|
||||||
|
# "rich-click",
|
||||||
|
# ]
|
||||||
|
# ///
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import uuid
|
||||||
|
import rich_click as click
|
||||||
|
from os import environ
|
||||||
|
from pathlib import Path
|
||||||
|
from pprint import pprint
|
||||||
|
|
||||||
|
from datalad.api import Dataset
|
||||||
|
from datalad_next.iter_collections import iter_annexworktree
|
||||||
|
|
||||||
|
from dump_things_pyclient.communicate import (
|
||||||
|
collection_write_record,
|
||||||
|
collection_read_record_with_pid,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
"""
|
||||||
|
Brainstorming how this tool can work:
|
||||||
|
|
||||||
|
* Use script on a local BIDS dataset
|
||||||
|
* Specify a subject
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
script_pid = '12345'
|
||||||
|
|
||||||
|
bare_subject_rec = {
|
||||||
|
'display_label': None,
|
||||||
|
'kind': None,
|
||||||
|
'schema_type': "xyzri:XYZSubject",
|
||||||
|
'pid': None,
|
||||||
|
'study': None
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bare_study_rec = {
|
||||||
|
"schema_type": "xyzri:XYZStudy",
|
||||||
|
"pid": None,
|
||||||
|
"title": None
|
||||||
|
}
|
||||||
|
|
||||||
|
bare_file_rec = {
|
||||||
|
"derived_from": [
|
||||||
|
{
|
||||||
|
"schema_type": "dlthings:Derivation",
|
||||||
|
"object": None
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"display_label": None,
|
||||||
|
"schema_type": "xyzri:XYZFile",
|
||||||
|
"pid": None,
|
||||||
|
"byte_size": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class FileRegistrator(object):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
repo: Dataset,
|
||||||
|
studypid: str,
|
||||||
|
subject: str,
|
||||||
|
subjectpid: str,
|
||||||
|
dtc_api_url: str,
|
||||||
|
dtc_collection: str,
|
||||||
|
) -> None:
|
||||||
|
self.repo = repo
|
||||||
|
self.studypid = studypid
|
||||||
|
self.subject = subject
|
||||||
|
self.subjectpid = subjectpid
|
||||||
|
self.dtc_api_url = dtc_api_url
|
||||||
|
self.dtc_collection = dtc_collection
|
||||||
|
|
||||||
|
def check_existing_subject_record(
|
||||||
|
self,
|
||||||
|
pid):
|
||||||
|
"""If there already is a subject record, we need to check if there are
|
||||||
|
already files derived from it. If we find those, we can't create new
|
||||||
|
records, but need to check old records"""
|
||||||
|
old_rec = self._get_existing_record
|
||||||
|
|
||||||
|
def _get_existing_record(self,
|
||||||
|
pid: str) -> dict | None:
|
||||||
|
record = collection_read_record_with_pid(
|
||||||
|
service_url=self.dtc_api_url,
|
||||||
|
collection=self.dtc_collection,
|
||||||
|
pid=pid,
|
||||||
|
token=environ['DTC_TOKEN']
|
||||||
|
)
|
||||||
|
return record
|
||||||
|
|
||||||
|
def create_records(
|
||||||
|
self,
|
||||||
|
file_rec,
|
||||||
|
) -> list :
|
||||||
|
"""Create metadata records on all files"""
|
||||||
|
# TODO: don't edit anything a human touched
|
||||||
|
recs = []
|
||||||
|
# list all files in the worktree
|
||||||
|
for file in iter_annexworktree(Path(self.repo.path) / self.subject):
|
||||||
|
file_rec['derived_from'][0]['object'] = self.subjectpid
|
||||||
|
annexkey = file.annexkey
|
||||||
|
file_rec['display_label'] = file.name.name
|
||||||
|
generated = ['derived_from', 'display_label', 'pid']
|
||||||
|
if annexkey is None:
|
||||||
|
file_rec['pid'] = _construct_pid(key=file.gitsha)
|
||||||
|
else:
|
||||||
|
file_rec['pid'] = _construct_pid(key=annexkey)
|
||||||
|
file_rec['byte_size'] = file.annexsize
|
||||||
|
generated.extend(['byte_size'])
|
||||||
|
# add annotation that a script provided this info
|
||||||
|
file_rec = self.machine_prov(file_rec, generated)
|
||||||
|
recs.append(file_rec)
|
||||||
|
return recs
|
||||||
|
|
||||||
|
def machine_prov(
|
||||||
|
self,
|
||||||
|
rec: dict,
|
||||||
|
generated: list
|
||||||
|
) -> dict:
|
||||||
|
prov = {'attributes': [
|
||||||
|
{'predicate': 'http://purl.org/pav/importedFrom',
|
||||||
|
'value': self.repo.path,
|
||||||
|
'attributes': []
|
||||||
|
}]}
|
||||||
|
for value in generated:
|
||||||
|
new = {'predicate': 'prov:generated',
|
||||||
|
'value': value,
|
||||||
|
'characterized_by': [{
|
||||||
|
'predicate': 'prov:generated_by',
|
||||||
|
'object': script_pid
|
||||||
|
}]}
|
||||||
|
prov['attributes'][0]['attributes'].append(new)
|
||||||
|
rec.update(prov)
|
||||||
|
return rec
|
||||||
|
|
||||||
|
def register(
|
||||||
|
self,
|
||||||
|
print_only: bool,
|
||||||
|
):
|
||||||
|
recs = self.create_records(file_rec=bare_file_rec)
|
||||||
|
if print_only:
|
||||||
|
pprint(recs)
|
||||||
|
return
|
||||||
|
#TODO: upload records to dumpthings
|
||||||
|
|
||||||
|
|
||||||
|
def create_study_record(
|
||||||
|
study_rec,
|
||||||
|
title
|
||||||
|
) -> (str, dict):
|
||||||
|
"""Create a new study record, if it does not yet exist."""
|
||||||
|
pid = _construct_pid(prefix='xyzrins', key=str(uuid.uuid4()))
|
||||||
|
study_rec['pid'] = pid
|
||||||
|
study_rec['title'] = title
|
||||||
|
return pid, study_rec
|
||||||
|
|
||||||
|
|
||||||
|
def create_subject_record(
|
||||||
|
study_pid,
|
||||||
|
subject_rec,
|
||||||
|
label,
|
||||||
|
kind="obo:NCBITaxon_9606", # human
|
||||||
|
) -> (str, dict):
|
||||||
|
pid = _construct_pid(prefix='xyzrins', key=str(uuid.uuid4()))
|
||||||
|
subject_rec['pid'] = pid
|
||||||
|
subject_rec['study'] = study_pid
|
||||||
|
subject_rec['display_label'] = label
|
||||||
|
subject_rec['kind'] = kind
|
||||||
|
return pid, subject_rec
|
||||||
|
|
||||||
|
|
||||||
|
def _construct_pid(
|
||||||
|
key: str,
|
||||||
|
prefix: str = 'dldi',
|
||||||
|
) -> str:
|
||||||
|
# take unused keys, construct pid programmatically
|
||||||
|
pid = f'{prefix}:{key}'
|
||||||
|
return pid
|
||||||
|
|
||||||
|
|
||||||
|
@click.command()
|
||||||
|
@click.option('--dtc-api-url', '-a', default='https://pool.psychoinformatics.de/api')
|
||||||
|
@click.option('--dtc-collection', '-c', default=['public'], multiple=True)
|
||||||
|
@click.option('--dataset', '-d')
|
||||||
|
@click.option('--study-pid')
|
||||||
|
@click.option('--sub')
|
||||||
|
@click.option('--study-label')
|
||||||
|
@click.option('--sub-pid')
|
||||||
|
def main(
|
||||||
|
dataset: str,
|
||||||
|
sub: str,
|
||||||
|
study_label: str,
|
||||||
|
study_pid: str | None = None,
|
||||||
|
sub_pid: str | None = None,
|
||||||
|
dtc_api_url: str = 'https://pool.psychoinformatics.de/api',
|
||||||
|
dtc_collection: str = 'public',
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Clone a BIDS-conform DataLad dataset and, per subject, create file records
|
||||||
|
for each included file.
|
||||||
|
|
||||||
|
Usage Notes:
|
||||||
|
|
||||||
|
Provide the following arguments:
|
||||||
|
- --dtc-api-url / -a: service URL of the dumpthings deployment (e.g.,
|
||||||
|
https://pool.psychoinformatics.de)
|
||||||
|
- --dtc-collection / -c: the collection into which records should be curated
|
||||||
|
(e.g., public).
|
||||||
|
- --dataset / -d: local path of the dataset to index
|
||||||
|
- --sub: within-dataset path to the subject directory to process, e.g.
|
||||||
|
'sub-01'.
|
||||||
|
- --study-pid (opt): Existing pid of the study the dataset belongs to. If
|
||||||
|
not given, a new study record is created.
|
||||||
|
- --study-label: A label to use for study and subject when creating new
|
||||||
|
records.
|
||||||
|
- --sub-pid (opt): Existing pid of the subject record to link. If not given,
|
||||||
|
a new subject record will be created, using the --sub argument as a label
|
||||||
|
|
||||||
|
Run the script using uv:
|
||||||
|
|
||||||
|
> uv run TODO
|
||||||
|
"""
|
||||||
|
repo = Dataset(Path(dataset))
|
||||||
|
if not study_pid:
|
||||||
|
logging.info('Creating new study record')
|
||||||
|
study_pid, study_rec = \
|
||||||
|
create_study_record(study_rec=bare_study_rec,
|
||||||
|
title=study_label)
|
||||||
|
if not sub_pid:
|
||||||
|
logging.info('Creating new subject record')
|
||||||
|
sub_pid, subject_rec = \
|
||||||
|
create_subject_record(study_pid=study_pid,
|
||||||
|
subject_rec=bare_subject_rec,
|
||||||
|
label=study_label + '-' + sub)
|
||||||
|
# TODO list and submit these records
|
||||||
|
ar = FileRegistrator(
|
||||||
|
repo=repo,
|
||||||
|
dtc_api_url=dtc_api_url,
|
||||||
|
dtc_collection=dtc_collection,
|
||||||
|
studypid=study_pid,
|
||||||
|
subjectpid=sub_pid,
|
||||||
|
subject=sub,
|
||||||
|
)
|
||||||
|
ar.register(
|
||||||
|
print_only=True,
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
If print_only is set to True, records are not submitted, just displayed
|
||||||
|
in stdout"""
|
||||||
|
#assert "DTC_TOKEN" in environ
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue