inbox-inspector/mini-curate.py
Michał Szczepanik 3d81fbabee Sort lists prior to diffing
Objects in all lists inside the object will be sorted (by their value if
they are strings, or using pid / object / or notation if they are
dicts). If none of the expected keys is present, the sorting will treat
all items as empty strings, and degrade to no-op.

This makes the diff more stable, since it is not sensitive to random
changes of order (although lists are technically ordered, in our case
changes of ordering are most likely an artifact of conversion between
triples and JSON which happens in the background).

The sorting will be done unconditionally, but it would be easy to
introduce a flag to turn that on or off.
2026-03-30 13:49:48 +02:00

189 lines
5.5 KiB
Python

import os
import subprocess
from contextlib import redirect_stderr, redirect_stdout
from io import StringIO
from pathlib import Path
import orjson
import rich_click as click
from graphtage import json as gtjson
from graphtage import printer as gtprinter
from rich.text import Text
from textual.app import App, ComposeResult
from textual.containers import Horizontal
from textual.widgets import Footer, Header, Label, ListItem, ListView, RichLog
def calc_diff(from_dict, to_dict):
with open(os.devnull, "w") as devnull:
with redirect_stdout(devnull):
with redirect_stderr(devnull):
# something deep inside really tries to use tqdm
# we need to send it to a black hole to avoid crashing textual
from_tree = gtjson.build_tree(from_dict)
to_tree = gtjson.build_tree(to_dict)
diff = from_tree.diff(to_tree)
return diff
def diff_to_rich(diff) -> Text:
with StringIO() as sio:
# another IO dance because it really wants to print
with gtprinter.Printer(out_stream=sio, ansi_color=True) as p:
gtjson.JSONFormatter.DEFAULT_INSTANCE.print(p, diff)
sio.seek(0)
ansi_diff = sio.read()
return Text.from_ansi(ansi_diff)
def sort_lists(d: dict) -> None:
for k in d:
if isinstance(d[k], list):
d[k] = sorted(d[k], key=to_key)
def to_key(x):
if isinstance(x, dict):
for k in ("pid", "object", "notation"):
if k in x:
sortkey = x[k]
break
else:
# no "standard" key, treat all as equal
sortkey = ""
else:
sortkey = x
return sortkey
def list_records(api_url, collection, inbox_label):
sp = subprocess.run(
[
"dtc",
"auto-curate",
api_url,
collection,
"--list-records",
"--include",
inbox_label,
],
capture_output=True,
)
d = orjson.loads(sp.stdout)
records = d[inbox_label] # later, we might keep all inboxes
for record in records:
sort_lists(record)
return records
def get_record(api_url, collection, pid):
env = os.environ.copy()
env.pop("DTC_TOKEN")
sp = subprocess.run(
[
"dtc",
"get-records",
api_url,
collection,
"--pid",
pid,
],
env=env,
capture_output=True,
)
record = orjson.loads(sp.stdout)
if record is not None:
sort_lists(record)
return record
@click.command()
@click.rich_config(help_config={"text_markup": "markdown"})
@click.argument("inbox_label")
@click.argument("service_url", envvar="DUMPTHINGS_APIURL")
@click.argument("collection", default="public")
@click.option(
"--kill-file",
default=Path("/tmp/pids_to_reject.txt"),
type=click.Path(path_type=Path),
help="File for rejected PIDs, can later be used with dtc delete-records.",
show_default=True,
)
def main(inbox_label, service_url, collection, kill_file):
"""Read records from INBOX_LABEL inbox & diff them.
This app will fetch the inbox on launch. Records from the inbox
can be diffed to their "shared-space" counterparts; these will be
fetched on-demand and cached in memory. Lists in the records will
be sorted if possible prior to diffing, making the diffs more
stable.
The SERVICE_URL can be specified as DUMPTHINGS_APIURL env var. The
COLLECTION is optional, and "public" by default.
A DTC_TOKEN env var must be specified, allowing
dump-things-pyclient to run in a subprocess.
"""
app = InspectorApp(service_url, collection, inbox_label, kill_file)
app.run()
class InspectorApp(App):
CSS_PATH = "mini-curate.tcss"
BINDINGS = [("k", "kill_line", "Kill line")]
inbox_records = []
public_records = {}
def __init__(self, service_url, collection, inbox_label, kill_file):
super().__init__()
self.api_url = service_url
self.collection = collection
self.inbox_label = inbox_label
self.kill_file = kill_file
def compose(self) -> ComposeResult:
yield Header()
yield Footer()
with Horizontal():
yield ListView(classes="leftColumn")
yield RichLog(classes="rightColumn")
def on_ready(self) -> None:
list_view = self.query_one(ListView)
self.inbox_records = list_records(
self.api_url, self.collection, self.inbox_label
)
for record in self.inbox_records:
list_view.append(ListItem(Label(record["pid"])))
def on_list_view_selected(self, event):
inbox_record = self.inbox_records[event.index]
pid = inbox_record["pid"]
if pid not in self.public_records:
self.public_records[pid] = get_record(self.api_url, self.collection, pid)
diff = calc_diff(self.public_records[pid], inbox_record)
rich_log = self.query_one(RichLog)
rich_log.clear()
rich_log.write(diff_to_rich(diff))
def action_kill_line(self):
list_view = self.query_one(ListView)
rich_log = self.query_one(RichLog)
idx = list_view.index
pid = self.inbox_records[idx]["pid"]
list_view.pop(idx)
self.inbox_records.pop(idx)
rich_log.clear()
rich_log.write(f"Killed ({idx}) {pid}")
with self.kill_file.open("a") as fp:
fp.write(f"{pid}\n")
if __name__ == "__main__":
main()