#!/usr/bin/env python3 from __future__ import annotations import argparse from datetime import datetime, timezone import html import json from pathlib import Path from typing import Any def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Aggregate run logs into control/video latency estimate outputs.") parser.add_argument("--run-dir", required=True, help="Run directory containing JSONL logs.") parser.add_argument("--output-dir", help="Output directory. Defaults to --run-dir.") return parser.parse_args() def iter_jsonl(path: Path) -> list[dict[str, Any]]: records: list[dict[str, Any]] = [] if not path.exists(): return records with path.open("r", encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line: continue try: payload = json.loads(line) except json.JSONDecodeError: continue if isinstance(payload, dict): records.append(payload) return records def load_glob_jsonl(run_dir: Path, pattern: str) -> list[dict[str, Any]]: records: list[dict[str, Any]] = [] for path in sorted(run_dir.glob(pattern)): records.extend(iter_jsonl(path)) return records def write_jsonl(path: Path, records: list[dict[str, Any]]) -> None: with path.open("w", encoding="utf-8") as handle: for record in records: handle.write(json.dumps(record, ensure_ascii=False, separators=(",", ":"))) handle.write("\n") def parse_unix_ms(value: Any) -> int | None: if value is None: return None if isinstance(value, (int, float)): return int(value) text = str(value).strip() if not text: return None if text.endswith("Z"): text = f"{text[:-1]}+00:00" try: return int(datetime.fromisoformat(text).astimezone(timezone.utc).timestamp() * 1000) except ValueError: return None def flatten_net_epoch(samples: list[dict[str, Any]]) -> list[dict[str, Any]]: flattened: list[dict[str, Any]] = [] for sample in samples: links = sample.get("links") or {} a_to_d = (links.get("a_to_d") or {}).get("sessions") or {} d_to_b = (links.get("d_to_b") or {}).get("sessions") or {} a_control = (a_to_d.get("control") or {}).get("kcp") or {} d_control = (d_to_b.get("control") or {}).get("kcp") or {} a_video = (a_to_d.get("video") or {}).get("kcp") or {} d_video = (d_to_b.get("video") or {}).get("kcp") or {} flattened.append( { "updated_at": sample.get("updated_at"), "a_to_d_control_srtt_ms": a_control.get("srtt_ms"), "a_to_d_control_min_srtt_ms": a_control.get("min_srtt_ms"), "d_to_b_control_srtt_ms": d_control.get("srtt_ms"), "d_to_b_control_min_srtt_ms": d_control.get("min_srtt_ms"), "a_to_d_video_srtt_ms": a_video.get("srtt_ms"), "a_to_d_video_min_srtt_ms": a_video.get("min_srtt_ms"), "d_to_b_video_srtt_ms": d_video.get("srtt_ms"), "d_to_b_video_min_srtt_ms": d_video.get("min_srtt_ms"), "a_to_d_control_feedback_age_ms": a_control.get("last_feedback_age_ms"), "d_to_b_control_feedback_age_ms": d_control.get("last_feedback_age_ms"), "a_to_d_video_feedback_age_ms": a_video.get("last_feedback_age_ms"), "d_to_b_video_feedback_age_ms": d_video.get("last_feedback_age_ms"), "a_to_d_control_retrans_delta": ((a_to_d.get("control") or {}).get("trend") or {}).get("retrans_delta"), "d_to_b_control_retrans_delta": ((d_to_b.get("control") or {}).get("trend") or {}).get("retrans_delta"), "a_to_d_video_retrans_delta": ((a_to_d.get("video") or {}).get("trend") or {}).get("retrans_delta"), "d_to_b_video_retrans_delta": ((d_to_b.get("video") or {}).get("trend") or {}).get("retrans_delta"), "a_to_d_video_window_pressure_pct": a_video.get("window_pressure_pct"), "d_to_b_video_window_pressure_pct": d_video.get("window_pressure_pct"), "robot_health": sample.get("robot_health"), } ) return flattened def aggregate_control_estimates( network_samples: list[dict[str, Any]], control_events: list[dict[str, Any]], control_acks: list[dict[str, Any]], ) -> list[dict[str, Any]]: if control_acks: return control_acks fallback: list[dict[str, Any]] = [] for sample in network_samples: estimate = sample.get("latency_estimate") or {} fallback.append( { "updated_at": sample.get("updated_at"), "estimate_method": "srtt_fallback", "control_loop_rtt_ms": estimate.get("control_loop_rtt_ms"), "control_to_persist_est_ms": estimate.get("control_to_persist_est_ms"), "control_oneway_srtt_est_ms": estimate.get("control_oneway_srtt_est_ms"), "control_oneway_bestcase_est_ms": estimate.get("control_oneway_bestcase_est_ms"), "source_event_count": len(control_events), } ) return fallback def aggregate_video_estimates( network_samples: list[dict[str, Any]], frame_recv_records: list[dict[str, Any]], display_probe_records: list[dict[str, Any]], ) -> list[dict[str, Any]]: network_timeline = sorted( ( (updated_at_ms, sample.get("latency_estimate") or {}) for sample in network_samples for updated_at_ms in [parse_unix_ms(sample.get("updated_at"))] if updated_at_ms is not None ), key=lambda item: item[0], ) probes_by_seq = { int(record["frame_seq"]): record for record in display_probe_records if record.get("frame_seq") is not None } estimates: list[dict[str, Any]] = [] timeline_index = 0 for record in frame_recv_records: frame_seq = record.get("frame_seq") if frame_seq is None: continue probe = probes_by_seq.get(int(frame_seq)) backend_received_unix_ns = record.get("backend_received_unix_ns") backend_received_unix_ms = None try: if backend_received_unix_ns is not None: backend_received_unix_ms = int(int(backend_received_unix_ns) / 1_000_000) except (TypeError, ValueError): backend_received_unix_ms = None latency_estimate: dict[str, Any] = {} if backend_received_unix_ms is not None and network_timeline: while timeline_index + 1 < len(network_timeline) and network_timeline[timeline_index + 1][0] <= backend_received_unix_ms: timeline_index += 1 if network_timeline[timeline_index][0] <= backend_received_unix_ms: latency_estimate = network_timeline[timeline_index][1] network_oneway = latency_estimate.get("video_network_oneway_est_ms") capture_to_send = record.get("b_side_capture_to_send_ms") partial_est = None if capture_to_send is not None or network_oneway is not None: partial_est = round(float(capture_to_send or 0.0) + float(network_oneway or 0.0), 3) request_to_paint_ms = None if probe is not None and probe.get("request_to_paint_ms") is not None: request_to_paint_ms = round(float(probe["request_to_paint_ms"]), 3) elif probe is not None and probe.get("request_started_unix_ms") is not None and probe.get("paint_unix_ms") is not None: request_to_paint_ms = round(float(probe["paint_unix_ms"]) - float(probe["request_started_unix_ms"]), 3) video_e2e_est_ms = round(partial_est + request_to_paint_ms, 3) if partial_est is not None and request_to_paint_ms is not None else None estimates.append( { "frame_seq": frame_seq, "backend_received_unix_ns": record.get("backend_received_unix_ns"), "frame_hash": record.get("frame_hash"), "estimate_method": "capture_to_send+srtt/2+request_to_paint" if video_e2e_est_ms is not None else "capture_to_send+srtt/2", "video_network_oneway_est_ms": network_oneway, "b_side_capture_to_send_ms": capture_to_send, "request_to_paint_ms": request_to_paint_ms, "video_partial_est_ms": partial_est, "video_e2e_est_ms": video_e2e_est_ms, "sequence_gap": record.get("sequence_gap"), "repeat_flag": record.get("repeat_flag"), "sender_clock_delta_ms_raw": record.get("sender_clock_delta_ms_raw"), } ) return estimates def write_html_summary( path: Path, *, net_epochs: list[dict[str, Any]], control_estimates: list[dict[str, Any]], video_estimates: list[dict[str, Any]], ) -> None: latest_control = control_estimates[-1] if control_estimates else {} latest_video = video_estimates[-1] if video_estimates else {} latest_net = net_epochs[-1] if net_epochs else {} html_text = f""" Latency Estimates

Latency Estimates

Control

loop RTT: {html.escape(str(latest_control.get("control_loop_rtt_ms")))}

to persist: {html.escape(str(latest_control.get("control_to_persist_est_ms")))}

method: {html.escape(str(latest_control.get("estimate_method")))}

samples: {len(control_estimates)}

Video

network one-way: {html.escape(str(latest_video.get("video_network_oneway_est_ms")))}

partial: {html.escape(str(latest_video.get("video_partial_est_ms")))}

end-to-end: {html.escape(str(latest_video.get("video_e2e_est_ms")))}

samples: {len(video_estimates)}

Net Epoch

a→d control srtt: {html.escape(str(latest_net.get("a_to_d_control_srtt_ms")))}

d→b control srtt: {html.escape(str(latest_net.get("d_to_b_control_srtt_ms")))}

a→d video srtt: {html.escape(str(latest_net.get("a_to_d_video_srtt_ms")))}

d→b video srtt: {html.escape(str(latest_net.get("d_to_b_video_srtt_ms")))}

""" path.write_text(html_text, encoding="utf-8") def main() -> int: args = parse_args() run_dir = Path(args.run_dir).resolve() output_dir = Path(args.output_dir).resolve() if args.output_dir else run_dir output_dir.mkdir(parents=True, exist_ok=True) network_samples = load_glob_jsonl(run_dir, "a-network-summary.*.jsonl") control_events = load_glob_jsonl(run_dir, "a-control-events.*.jsonl") control_acks = load_glob_jsonl(run_dir, "a-control-acks.*.jsonl") frame_recv_records = load_glob_jsonl(run_dir, "a-video-frame-recv.*.jsonl") display_probe_records = load_glob_jsonl(run_dir, "a-video-display-probe.*.jsonl") net_epochs = flatten_net_epoch(network_samples) control_estimates = aggregate_control_estimates(network_samples, control_events, control_acks) video_estimates = aggregate_video_estimates(network_samples, frame_recv_records, display_probe_records) write_jsonl(output_dir / "net-epoch-summary.jsonl", net_epochs) write_jsonl(output_dir / "control-latency-estimates.jsonl", control_estimates) write_jsonl(output_dir / "video-latency-estimates.jsonl", video_estimates) write_html_summary( output_dir / "latency-estimates.html", net_epochs=net_epochs, control_estimates=control_estimates, video_estimates=video_estimates, ) return 0 if __name__ == "__main__": raise SystemExit(main())