WIP: Start towards automatic file metadata ingestion from local datasets #19
1 changed files with 267 additions and 0 deletions
267
code/index-dataset-files.py
Normal file
267
code/index-dataset-files.py
Normal file
|
|
@ -0,0 +1,267 @@
|
|||
# /// script
|
||||
# requires-python = ">=3.12"
|
||||
# dependencies = [
|
||||
# "datalad-core @ git+https://hub.datalad.org/datalad/datalad-core@minilad",
|
||||
# "dump-things-pyclient @ https://hub.psychoinformatics.de/datalink/dump-things-pyclient.git",
|
||||
# "git-annex",
|
||||
# "rich",
|
||||
# "rich-click",
|
||||
# ]
|
||||
# ///
|
||||
|
||||
import logging
|
||||
import uuid
|
||||
import rich_click as click
|
||||
from os import environ
|
||||
from pathlib import Path
|
||||
from pprint import pprint
|
||||
|
||||
from datalad.api import Dataset
|
||||
from datalad_next.iter_collections import iter_annexworktree
|
||||
|
||||
from dump_things_pyclient.communicate import (
|
||||
collection_write_record,
|
||||
collection_read_record_with_pid,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
"""
|
||||
Brainstorming how this tool can work:
|
||||
|
||||
* Use script on a local BIDS dataset
|
||||
* Specify a subject
|
||||
"""
|
||||
|
||||
|
||||
script_pid = '12345'
|
||||
|
||||
bare_subject_rec = {
|
||||
'display_label': None,
|
||||
'kind': None,
|
||||
'schema_type': "xyzri:XYZSubject",
|
||||
'pid': None,
|
||||
'study': None
|
||||
}
|
||||
|
||||
|
||||
bare_study_rec = {
|
||||
"schema_type": "xyzri:XYZStudy",
|
||||
"pid": None,
|
||||
"title": None
|
||||
}
|
||||
|
||||
bare_file_rec = {
|
||||
"derived_from": [
|
||||
{
|
||||
"schema_type": "dlthings:Derivation",
|
||||
"object": None
|
||||
}
|
||||
],
|
||||
"display_label": None,
|
||||
"schema_type": "xyzri:XYZFile",
|
||||
"pid": None,
|
||||
"byte_size": None,
|
||||
}
|
||||
|
||||
|
||||
class FileRegistrator(object):
|
||||
def __init__(
|
||||
self,
|
||||
repo: Dataset,
|
||||
studypid: str,
|
||||
subject: str,
|
||||
subjectpid: str,
|
||||
dtc_api_url: str,
|
||||
dtc_collection: str,
|
||||
) -> None:
|
||||
self.repo = repo
|
||||
self.studypid = studypid
|
||||
self.subject = subject
|
||||
self.subjectpid = subjectpid
|
||||
self.dtc_api_url = dtc_api_url
|
||||
self.dtc_collection = dtc_collection
|
||||
|
||||
def check_existing_subject_record(
|
||||
self,
|
||||
pid):
|
||||
"""If there already is a subject record, we need to check if there are
|
||||
already files derived from it. If we find those, we can't create new
|
||||
records, but need to check old records"""
|
||||
old_rec = self._get_existing_record
|
||||
|
||||
def _get_existing_record(self,
|
||||
pid: str) -> dict | None:
|
||||
record = collection_read_record_with_pid(
|
||||
service_url=self.dtc_api_url,
|
||||
collection=self.dtc_collection,
|
||||
pid=pid,
|
||||
token=environ['DTC_TOKEN']
|
||||
)
|
||||
return record
|
||||
|
||||
def create_records(
|
||||
self,
|
||||
file_rec,
|
||||
) -> list :
|
||||
"""Create metadata records on all files"""
|
||||
# TODO: don't edit anything a human touched
|
||||
recs = []
|
||||
# list all files in the worktree
|
||||
for file in iter_annexworktree(Path(self.repo.path) / self.subject):
|
||||
file_rec['derived_from'][0]['object'] = self.subjectpid
|
||||
annexkey = file.annexkey
|
||||
file_rec['display_label'] = file.name.name
|
||||
generated = ['derived_from', 'display_label', 'pid']
|
||||
if annexkey is None:
|
||||
file_rec['pid'] = _construct_pid(key=file.gitsha)
|
||||
else:
|
||||
file_rec['pid'] = _construct_pid(key=annexkey)
|
||||
file_rec['byte_size'] = file.annexsize
|
||||
generated.extend(['byte_size'])
|
||||
# add annotation that a script provided this info
|
||||
file_rec = self.machine_prov(file_rec, generated)
|
||||
recs.append(file_rec)
|
||||
return recs
|
||||
|
||||
def machine_prov(
|
||||
self,
|
||||
rec: dict,
|
||||
generated: list
|
||||
) -> dict:
|
||||
prov = {'attributes': [
|
||||
{'predicate': 'http://purl.org/pav/importedFrom',
|
||||
'value': self.repo.path,
|
||||
'attributes': []
|
||||
}]}
|
||||
for value in generated:
|
||||
new = {'predicate': 'prov:generated',
|
||||
'value': value,
|
||||
'characterized_by': [{
|
||||
'predicate': 'prov:generated_by',
|
||||
'object': script_pid
|
||||
}]}
|
||||
prov['attributes'][0]['attributes'].append(new)
|
||||
rec.update(prov)
|
||||
return rec
|
||||
|
||||
def register(
|
||||
self,
|
||||
print_only: bool,
|
||||
):
|
||||
recs = self.create_records(file_rec=bare_file_rec)
|
||||
if print_only:
|
||||
pprint(recs)
|
||||
return
|
||||
#TODO: upload records to dumpthings
|
||||
|
||||
|
||||
def create_study_record(
|
||||
study_rec,
|
||||
title
|
||||
) -> (str, dict):
|
||||
"""Create a new study record, if it does not yet exist."""
|
||||
pid = _construct_pid(prefix='xyzrins', key=str(uuid.uuid4()))
|
||||
study_rec['pid'] = pid
|
||||
study_rec['title'] = title
|
||||
return pid, study_rec
|
||||
|
||||
|
||||
def create_subject_record(
|
||||
study_pid,
|
||||
subject_rec,
|
||||
label,
|
||||
kind="obo:NCBITaxon_9606", # human
|
||||
) -> (str, dict):
|
||||
pid = _construct_pid(prefix='xyzrins', key=str(uuid.uuid4()))
|
||||
subject_rec['pid'] = pid
|
||||
subject_rec['study'] = study_pid
|
||||
subject_rec['display_label'] = label
|
||||
subject_rec['kind'] = kind
|
||||
return pid, subject_rec
|
||||
|
||||
|
||||
def _construct_pid(
|
||||
key: str,
|
||||
prefix: str = 'dldi',
|
||||
) -> str:
|
||||
# take unused keys, construct pid programmatically
|
||||
pid = f'{prefix}:{key}'
|
||||
return pid
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option('--dtc-api-url', '-a', default='https://pool.psychoinformatics.de/api')
|
||||
@click.option('--dtc-collection', '-c', default=['public'], multiple=True)
|
||||
@click.option('--dataset', '-d')
|
||||
@click.option('--study-pid')
|
||||
@click.option('--sub')
|
||||
@click.option('--study-label')
|
||||
@click.option('--sub-pid')
|
||||
def main(
|
||||
dataset: str,
|
||||
sub: str,
|
||||
study_label: str,
|
||||
study_pid: str | None = None,
|
||||
sub_pid: str | None = None,
|
||||
dtc_api_url: str = 'https://pool.psychoinformatics.de/api',
|
||||
dtc_collection: str = 'public',
|
||||
) -> None:
|
||||
"""
|
||||
Clone a BIDS-conform DataLad dataset and, per subject, create file records
|
||||
for each included file.
|
||||
|
||||
Usage Notes:
|
||||
|
||||
Provide the following arguments:
|
||||
- --dtc-api-url / -a: service URL of the dumpthings deployment (e.g.,
|
||||
https://pool.psychoinformatics.de)
|
||||
- --dtc-collection / -c: the collection into which records should be curated
|
||||
(e.g., public).
|
||||
- --dataset / -d: local path of the dataset to index
|
||||
- --sub: within-dataset path to the subject directory to process, e.g.
|
||||
'sub-01'.
|
||||
- --study-pid (opt): Existing pid of the study the dataset belongs to. If
|
||||
not given, a new study record is created.
|
||||
- --study-label: A label to use for study and subject when creating new
|
||||
records.
|
||||
- --sub-pid (opt): Existing pid of the subject record to link. If not given,
|
||||
a new subject record will be created, using the --sub argument as a label
|
||||
|
||||
Run the script using uv:
|
||||
|
||||
> uv run TODO
|
||||
"""
|
||||
repo = Dataset(Path(dataset))
|
||||
if not study_pid:
|
||||
logging.info('Creating new study record')
|
||||
study_pid, study_rec = \
|
||||
create_study_record(study_rec=bare_study_rec,
|
||||
title=study_label)
|
||||
if not sub_pid:
|
||||
logging.info('Creating new subject record')
|
||||
sub_pid, subject_rec = \
|
||||
create_subject_record(study_pid=study_pid,
|
||||
subject_rec=bare_subject_rec,
|
||||
label=study_label + '-' + sub)
|
||||
# TODO list and submit these records
|
||||
ar = FileRegistrator(
|
||||
repo=repo,
|
||||
dtc_api_url=dtc_api_url,
|
||||
dtc_collection=dtc_collection,
|
||||
studypid=study_pid,
|
||||
subjectpid=sub_pid,
|
||||
subject=sub,
|
||||
)
|
||||
ar.register(
|
||||
print_only=True,
|
||||
)
|
||||
"""
|
||||
If print_only is set to True, records are not submitted, just displayed
|
||||
in stdout"""
|
||||
#assert "DTC_TOKEN" in environ
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue