WIP: Start towards automatic file metadata ingestion from local datasets #19

Draft
adina wants to merge 4 commits from index-dataset-files.py into main

267
code/index-dataset-files.py Normal file
View file

@ -0,0 +1,267 @@
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "datalad-core @ git+https://hub.datalad.org/datalad/datalad-core@minilad",
# "dump-things-pyclient @ https://hub.psychoinformatics.de/datalink/dump-things-pyclient.git",
# "git-annex",
# "rich",
# "rich-click",
# ]
# ///
import logging
import uuid
import rich_click as click
from os import environ
from pathlib import Path
from pprint import pprint
from datalad.api import Dataset
from datalad_next.iter_collections import iter_annexworktree
from dump_things_pyclient.communicate import (
collection_write_record,
collection_read_record_with_pid,
)
logger = logging.getLogger(__name__)
"""
Brainstorming how this tool can work:
* Use script on a local BIDS dataset
* Specify a subject
"""
script_pid = '12345'
bare_subject_rec = {
'display_label': None,
'kind': None,
'schema_type': "xyzri:XYZSubject",
'pid': None,
'study': None
}
bare_study_rec = {
"schema_type": "xyzri:XYZStudy",
"pid": None,
"title": None
}
bare_file_rec = {
"derived_from": [
{
"schema_type": "dlthings:Derivation",
"object": None
}
],
"display_label": None,
"schema_type": "xyzri:XYZFile",
"pid": None,
"byte_size": None,
}
class FileRegistrator(object):
def __init__(
self,
repo: Dataset,
studypid: str,
subject: str,
subjectpid: str,
dtc_api_url: str,
dtc_collection: str,
) -> None:
self.repo = repo
self.studypid = studypid
self.subject = subject
self.subjectpid = subjectpid
self.dtc_api_url = dtc_api_url
self.dtc_collection = dtc_collection
def check_existing_subject_record(
self,
pid):
"""If there already is a subject record, we need to check if there are
already files derived from it. If we find those, we can't create new
records, but need to check old records"""
old_rec = self._get_existing_record
def _get_existing_record(self,
pid: str) -> dict | None:
record = collection_read_record_with_pid(
service_url=self.dtc_api_url,
collection=self.dtc_collection,
pid=pid,
token=environ['DTC_TOKEN']
)
return record
def create_records(
self,
file_rec,
) -> list :
"""Create metadata records on all files"""
# TODO: don't edit anything a human touched
recs = []
# list all files in the worktree
for file in iter_annexworktree(Path(self.repo.path) / self.subject):
file_rec['derived_from'][0]['object'] = self.subjectpid
annexkey = file.annexkey
file_rec['display_label'] = file.name.name
generated = ['derived_from', 'display_label', 'pid']
if annexkey is None:
file_rec['pid'] = _construct_pid(key=file.gitsha)
else:
file_rec['pid'] = _construct_pid(key=annexkey)
file_rec['byte_size'] = file.annexsize
generated.extend(['byte_size'])
# add annotation that a script provided this info
file_rec = self.machine_prov(file_rec, generated)
recs.append(file_rec)
return recs
def machine_prov(
self,
rec: dict,
generated: list
) -> dict:
prov = {'attributes': [
{'predicate': 'http://purl.org/pav/importedFrom',
'value': self.repo.path,
'attributes': []
}]}
for value in generated:
new = {'predicate': 'prov:generated',
'value': value,
'characterized_by': [{
'predicate': 'prov:generated_by',
'object': script_pid
}]}
prov['attributes'][0]['attributes'].append(new)
rec.update(prov)
return rec
def register(
self,
print_only: bool,
):
recs = self.create_records(file_rec=bare_file_rec)
if print_only:
pprint(recs)
return
#TODO: upload records to dumpthings
def create_study_record(
study_rec,
title
) -> (str, dict):
"""Create a new study record, if it does not yet exist."""
pid = _construct_pid(prefix='xyzrins', key=str(uuid.uuid4()))
study_rec['pid'] = pid
study_rec['title'] = title
return pid, study_rec
def create_subject_record(
study_pid,
subject_rec,
label,
kind="obo:NCBITaxon_9606", # human
) -> (str, dict):
pid = _construct_pid(prefix='xyzrins', key=str(uuid.uuid4()))
subject_rec['pid'] = pid
subject_rec['study'] = study_pid
subject_rec['display_label'] = label
subject_rec['kind'] = kind
return pid, subject_rec
def _construct_pid(
key: str,
prefix: str = 'dldi',
) -> str:
# take unused keys, construct pid programmatically
pid = f'{prefix}:{key}'
return pid
@click.command()
@click.option('--dtc-api-url', '-a', default='https://pool.psychoinformatics.de/api')
@click.option('--dtc-collection', '-c', default=['public'], multiple=True)
@click.option('--dataset', '-d')
@click.option('--study-pid')
@click.option('--sub')
@click.option('--study-label')
@click.option('--sub-pid')
def main(
dataset: str,
sub: str,
study_label: str,
study_pid: str | None = None,
sub_pid: str | None = None,
dtc_api_url: str = 'https://pool.psychoinformatics.de/api',
dtc_collection: str = 'public',
) -> None:
"""
Clone a BIDS-conform DataLad dataset and, per subject, create file records
for each included file.
Usage Notes:
Provide the following arguments:
- --dtc-api-url / -a: service URL of the dumpthings deployment (e.g.,
https://pool.psychoinformatics.de)
- --dtc-collection / -c: the collection into which records should be curated
(e.g., public).
- --dataset / -d: local path of the dataset to index
- --sub: within-dataset path to the subject directory to process, e.g.
'sub-01'.
- --study-pid (opt): Existing pid of the study the dataset belongs to. If
not given, a new study record is created.
- --study-label: A label to use for study and subject when creating new
records.
- --sub-pid (opt): Existing pid of the subject record to link. If not given,
a new subject record will be created, using the --sub argument as a label
Run the script using uv:
> uv run TODO
"""
repo = Dataset(Path(dataset))
if not study_pid:
logging.info('Creating new study record')
study_pid, study_rec = \
create_study_record(study_rec=bare_study_rec,
title=study_label)
if not sub_pid:
logging.info('Creating new subject record')
sub_pid, subject_rec = \
create_subject_record(study_pid=study_pid,
subject_rec=bare_subject_rec,
label=study_label + '-' + sub)
# TODO list and submit these records
ar = FileRegistrator(
repo=repo,
dtc_api_url=dtc_api_url,
dtc_collection=dtc_collection,
studypid=study_pid,
subjectpid=sub_pid,
subject=sub,
)
ar.register(
print_only=True,
)
"""
If print_only is set to True, records are not submitted, just displayed
in stdout"""
#assert "DTC_TOKEN" in environ
if __name__ == '__main__':
main()