add registration tool for shacl-vue file uploads #17

Merged
adina merged 1 commit from adina/tools:register into main 2026-02-05 13:11:21 +00:00

280
code/register-upload.py Normal file
View file

@ -0,0 +1,280 @@
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "datalad-core @ git+https://hub.datalad.org/datalad/datalad-core@minilad",
# "dump-things-pyclient @ https://hub.psychoinformatics.de/datalink/dump-things-pyclient.git",
# "git-annex",
# "rich",
# "rich-click",
# ]
# ///
import json
import sys
import rich_click as click
from os import environ
from pathlib import Path
from subprocess import run as subprocess_run
from urllib.parse import (
urlparse,
urlunparse,
)
from datalad_core.git_utils import apply_changeset
from datalad_core.repo import Repo
from datalad_core.runners import call_git
from datalad_core.clone import clone as do_clone
from datalad_core.clone import AnnexInitMode
from dump_things_pyclient.communicate import (
collection_read_records_of_class,
collection_read_record_with_pid,
)
# helper functions for file name transformations
def _transform_chars_to_dash(text: str) -> str:
sanitized_text = text.replace(':', '-').replace('/', '-')
return sanitized_text
class AnnexRegistrator(object):
def __init__(
self,
repo: Repo,
pidprefix: str,
dtc_api_url: str,
dtc_collection: str,
) -> None:
self.repo = repo
self.pidprefix = pidprefix
self.mapping = {}
self.dtc_api_url = dtc_api_url
self.dtc_collection = dtc_collection
def _get_pointer_file_location(
self,
info: dict,
pid_transformation: callable = _transform_chars_to_dash,
) -> str:
"""
Compute the target path for a file in the repository.
# attachments/
# schema-type/
# pid/
# title.ext
:param info: A dictionary with information about the file
:param pid_transformation: A function to perform string substitution
to sanitize paths
"""
file_path =\
f'attachments/{pid_transformation(info["schema"])}/' \
f'{pid_transformation(info["pid"])}/' \
f'{pid_transformation(info["title"])}'
return file_path
def register(self) -> None:
"""
Assemble the changeset and commit. The changeset is a path - content
keypair with the path being the document title in its target directory
(e.g., attachments/PDF/<pid>/CV.pdf), and the content is an annex
pointer file content containing the document key.
"""
self.assemble_metadata()
if not self.mapping:
# no new uploads, nothing to do
# TODO log this properly
print('nothing to do, no metadata found')
return
changeset = {}
for key, info in self.mapping.items():
target = f'/annex/objects/{key}\n'
link = self._get_pointer_file_location(info)
changeset[link] = target
# commit all files at once
apply_changeset(
self.repo,
changeset,
message="File upload via CI"
)
def assemble_metadata(self):
""" Retrieve record of file uploads and associated metadata."""
# step 1: look up unused files
keys = self.findunused()
# step 2: build meta data for each key
for key in keys:
pid = self._construct_pid(key)
# Step 3: get file record by pid, extract 'distributions'
file_record = self._lookup_record_by_pid(pid=pid)
if not file_record:
# there is no matching file record under this url and collection
# TODO: log this!
print(f'no file record for pid {pid} '
f'in collection {self.dtc_collection} '
f'at {self.dtc_api_url}.')
continue
distribution_pid = file_record.get('distribution_of', None)
if not distribution_pid:
# could not find the associated distribution record
print(f'no distribution record for pid {distribution_pid} '
f'in collection {self.dtc_collection} '
f'at {self.dtc_api_url}.')
# TODO: try other collections?
continue
dist_record = self._lookup_record_by_pid(distribution_pid)
# TODO: change keys/placeholder later once structure of records is
# known
self.mapping[key] =\
{'title': dist_record.get('display_label', 'placeholder'),
'schema': dist_record.get('schema_type', 'placeholder'),
'pid': dist_record.get('pid', 'placeholder'),
}
def findunused(self) -> list:
""" find and report all unused keys in repo."""
annex_cmd = [
'-C',
str(self.repo.path),
'annex',
'unused',
'-f',
'origin',
'--json',
]
out = call_git(annex_cmd, capture_output=True, text=True)
data = json.loads(out)
unused = data['unused-list'].values()
return unused
def _construct_pid(
self,
key: str,
prefix: str = 'dldi',
) -> str:
# take unused keys, construct pid programmatically
pid = f'{prefix}:{key}'
return pid
def _lookup_record_by_pid(
self,
pid: str,
) -> dict | None:
record = collection_read_record_with_pid(
service_url=self.dtc_api_url,
collection=self.dtc_collection,
pid=pid,
token=environ['DTC_TOKEN']
)
return record
# https://hub.datalad.org/forgejo/datalad-clone-action/src/branch/main/entrypoint.py
def get_clone_url() -> str:
serverurl = environ.get('INPUT_SERVER_URL')
if not serverurl:
serverurl = environ.get('FORGEJO_SERVER_URL')
if not serverurl:
raise RuntimeError('No "serverurl"')
serverurl_p = urlparse(serverurl)
repo = environ.get('INPUT_REPOSITORY')
if not repo:
repo = environ['FORGEJO_REPOSITORY']
assert repo
auth_token = environ.get("INPUT_TOKEN")
if auth_token is None \
and environ['FORGEJO_SERVER_URL'] == serverurl:
auth_token = environ['FORGEJO_TOKEN']
# auth-url
cloneurl = urlunparse(serverurl_p._replace(
netloc=f'{auth_token}@{serverurl_p.netloc}'
if auth_token else serverurl_p.netloc,
path=f'{serverurl_p.path}/{repo}.git'
if serverurl_p.path else f'{repo}.git'
))
return cloneurl
def run(args, **kwargs) -> None:
print(f'RUN: {args!r}', file=sys.stderr)
subprocess_run(args, **kwargs)
def get_dest_path() -> str:
path = environ.get('INPUT_PATH')
if path:
path = f"{environ.get('FORGEJO_WORKSPACE', '.')}/{path}"
run(['mkdir', '-p', path])
else:
# generic name. Can't be ., git-lad would not clone there
path = 'uploads'
return path
def clone() -> str:
dest_path = get_dest_path()
do_clone(
repository=get_clone_url(),
directory=dest_path,
bare=True,
private=True,
annex_init=AnnexInitMode.FULL
)
call_git(['-C', dest_path, 'annex', 'init'])
return dest_path
def push(repo_path: str) -> None:
push_cmd = ['-C', repo_path, 'push', 'origin', 'main']
call_git(push_cmd)
@click.command()
@click.option('--dtc-api-url', '-a', default='https://pool.v0.trr379.de/api')
@click.option('--dtc-collection', '-c', default='public')
@click.option('--dtc-pidprefix', '-p', default='dldi')
def main(
dtc_api_url: str = 'https://pool.v0.trr379.de/api',
dtc_collection: str = 'public',
dtc_pidprefix: str = 'dldi',
) -> None:
"""
Register file uploads performed via a shacl-vue editor into the target
repository to associate the key present after upload with a file name in the
worktree.
Usage Notes:
Provide three arguments:
- service URL of the dumpthings deployment (e.g., https://pool.v0.trr379.de)
- the collection in which records should be queried (e.g., public)
- the pidprefix for the pid of uploaded files.
Run the script using uv:
> uv run register-upload.py" -a <url> -c <collection> -p <prefix>
This script is meant to be used in a CI Action within a repository that is
the file upload backend of a shacl-vue/dumpthings-deployment.
In your repository, configure an action secret "POOLTOKEN" with a token with
read permissions to the dumpthings-deployment of your choice.
"""
assert "DTC_TOKEN" in environ
repo_path = clone()
repo = Repo(Path(repo_path))
# TODO: support multiple collections
ar = AnnexRegistrator(
repo=repo,
dtc_api_url=dtc_api_url,
dtc_collection=dtc_collection,
pidprefix=dtc_pidprefix)
ar.register()
push(repo_path)
if __name__ == '__main__':
main()