293 lines
13 KiB
Python
293 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
from datetime import datetime, timezone
|
|
import html
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Aggregate run logs into control/video latency estimate outputs.")
|
|
parser.add_argument("--run-dir", required=True, help="Run directory containing JSONL logs.")
|
|
parser.add_argument("--output-dir", help="Output directory. Defaults to --run-dir.")
|
|
return parser.parse_args()
|
|
|
|
|
|
def iter_jsonl(path: Path) -> list[dict[str, Any]]:
|
|
records: list[dict[str, Any]] = []
|
|
if not path.exists():
|
|
return records
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
for raw_line in handle:
|
|
line = raw_line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
payload = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if isinstance(payload, dict):
|
|
records.append(payload)
|
|
return records
|
|
|
|
|
|
def load_glob_jsonl(run_dir: Path, pattern: str) -> list[dict[str, Any]]:
|
|
records: list[dict[str, Any]] = []
|
|
for path in sorted(run_dir.glob(pattern)):
|
|
records.extend(iter_jsonl(path))
|
|
return records
|
|
|
|
|
|
def write_jsonl(path: Path, records: list[dict[str, Any]]) -> None:
|
|
with path.open("w", encoding="utf-8") as handle:
|
|
for record in records:
|
|
handle.write(json.dumps(record, ensure_ascii=False, separators=(",", ":")))
|
|
handle.write("\n")
|
|
|
|
|
|
def parse_unix_ms(value: Any) -> int | None:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, (int, float)):
|
|
return int(value)
|
|
text = str(value).strip()
|
|
if not text:
|
|
return None
|
|
if text.endswith("Z"):
|
|
text = f"{text[:-1]}+00:00"
|
|
try:
|
|
return int(datetime.fromisoformat(text).astimezone(timezone.utc).timestamp() * 1000)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def flatten_net_epoch(samples: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
flattened: list[dict[str, Any]] = []
|
|
for sample in samples:
|
|
links = sample.get("links") or {}
|
|
a_to_d = (links.get("a_to_d") or {}).get("sessions") or {}
|
|
d_to_b = (links.get("d_to_b") or {}).get("sessions") or {}
|
|
a_control = (a_to_d.get("control") or {}).get("kcp") or {}
|
|
d_control = (d_to_b.get("control") or {}).get("kcp") or {}
|
|
a_video = (a_to_d.get("video") or {}).get("kcp") or {}
|
|
d_video = (d_to_b.get("video") or {}).get("kcp") or {}
|
|
flattened.append(
|
|
{
|
|
"updated_at": sample.get("updated_at"),
|
|
"a_to_d_control_srtt_ms": a_control.get("srtt_ms"),
|
|
"a_to_d_control_min_srtt_ms": a_control.get("min_srtt_ms"),
|
|
"d_to_b_control_srtt_ms": d_control.get("srtt_ms"),
|
|
"d_to_b_control_min_srtt_ms": d_control.get("min_srtt_ms"),
|
|
"a_to_d_video_srtt_ms": a_video.get("srtt_ms"),
|
|
"a_to_d_video_min_srtt_ms": a_video.get("min_srtt_ms"),
|
|
"d_to_b_video_srtt_ms": d_video.get("srtt_ms"),
|
|
"d_to_b_video_min_srtt_ms": d_video.get("min_srtt_ms"),
|
|
"a_to_d_control_feedback_age_ms": a_control.get("last_feedback_age_ms"),
|
|
"d_to_b_control_feedback_age_ms": d_control.get("last_feedback_age_ms"),
|
|
"a_to_d_video_feedback_age_ms": a_video.get("last_feedback_age_ms"),
|
|
"d_to_b_video_feedback_age_ms": d_video.get("last_feedback_age_ms"),
|
|
"a_to_d_control_retrans_delta": ((a_to_d.get("control") or {}).get("trend") or {}).get("retrans_delta"),
|
|
"d_to_b_control_retrans_delta": ((d_to_b.get("control") or {}).get("trend") or {}).get("retrans_delta"),
|
|
"a_to_d_video_retrans_delta": ((a_to_d.get("video") or {}).get("trend") or {}).get("retrans_delta"),
|
|
"d_to_b_video_retrans_delta": ((d_to_b.get("video") or {}).get("trend") or {}).get("retrans_delta"),
|
|
"a_to_d_video_window_pressure_pct": a_video.get("window_pressure_pct"),
|
|
"d_to_b_video_window_pressure_pct": d_video.get("window_pressure_pct"),
|
|
"robot_health": sample.get("robot_health"),
|
|
}
|
|
)
|
|
return flattened
|
|
|
|
|
|
def aggregate_control_estimates(
|
|
network_samples: list[dict[str, Any]],
|
|
control_events: list[dict[str, Any]],
|
|
control_acks: list[dict[str, Any]],
|
|
) -> list[dict[str, Any]]:
|
|
if control_acks:
|
|
return control_acks
|
|
|
|
fallback: list[dict[str, Any]] = []
|
|
for sample in network_samples:
|
|
estimate = sample.get("latency_estimate") or {}
|
|
fallback.append(
|
|
{
|
|
"updated_at": sample.get("updated_at"),
|
|
"estimate_method": "srtt_fallback",
|
|
"control_loop_rtt_ms": estimate.get("control_loop_rtt_ms"),
|
|
"control_to_persist_est_ms": estimate.get("control_to_persist_est_ms"),
|
|
"control_oneway_srtt_est_ms": estimate.get("control_oneway_srtt_est_ms"),
|
|
"control_oneway_bestcase_est_ms": estimate.get("control_oneway_bestcase_est_ms"),
|
|
"source_event_count": len(control_events),
|
|
}
|
|
)
|
|
return fallback
|
|
|
|
|
|
def aggregate_video_estimates(
|
|
network_samples: list[dict[str, Any]],
|
|
frame_recv_records: list[dict[str, Any]],
|
|
display_probe_records: list[dict[str, Any]],
|
|
) -> list[dict[str, Any]]:
|
|
network_timeline = sorted(
|
|
(
|
|
(updated_at_ms, sample.get("latency_estimate") or {})
|
|
for sample in network_samples
|
|
for updated_at_ms in [parse_unix_ms(sample.get("updated_at"))]
|
|
if updated_at_ms is not None
|
|
),
|
|
key=lambda item: item[0],
|
|
)
|
|
probes_by_seq = {
|
|
int(record["frame_seq"]): record
|
|
for record in display_probe_records
|
|
if record.get("frame_seq") is not None
|
|
}
|
|
estimates: list[dict[str, Any]] = []
|
|
timeline_index = 0
|
|
|
|
for record in frame_recv_records:
|
|
frame_seq = record.get("frame_seq")
|
|
if frame_seq is None:
|
|
continue
|
|
probe = probes_by_seq.get(int(frame_seq))
|
|
backend_received_unix_ns = record.get("backend_received_unix_ns")
|
|
backend_received_unix_ms = None
|
|
try:
|
|
if backend_received_unix_ns is not None:
|
|
backend_received_unix_ms = int(int(backend_received_unix_ns) / 1_000_000)
|
|
except (TypeError, ValueError):
|
|
backend_received_unix_ms = None
|
|
|
|
latency_estimate: dict[str, Any] = {}
|
|
if backend_received_unix_ms is not None and network_timeline:
|
|
while timeline_index + 1 < len(network_timeline) and network_timeline[timeline_index + 1][0] <= backend_received_unix_ms:
|
|
timeline_index += 1
|
|
if network_timeline[timeline_index][0] <= backend_received_unix_ms:
|
|
latency_estimate = network_timeline[timeline_index][1]
|
|
|
|
network_oneway = latency_estimate.get("video_network_oneway_est_ms")
|
|
capture_to_send = record.get("b_side_capture_to_send_ms")
|
|
partial_est = None
|
|
if capture_to_send is not None or network_oneway is not None:
|
|
partial_est = round(float(capture_to_send or 0.0) + float(network_oneway or 0.0), 3)
|
|
request_to_paint_ms = None
|
|
if probe is not None and probe.get("request_to_paint_ms") is not None:
|
|
request_to_paint_ms = round(float(probe["request_to_paint_ms"]), 3)
|
|
elif probe is not None and probe.get("request_started_unix_ms") is not None and probe.get("paint_unix_ms") is not None:
|
|
request_to_paint_ms = round(float(probe["paint_unix_ms"]) - float(probe["request_started_unix_ms"]), 3)
|
|
video_e2e_est_ms = round(partial_est + request_to_paint_ms, 3) if partial_est is not None and request_to_paint_ms is not None else None
|
|
estimates.append(
|
|
{
|
|
"frame_seq": frame_seq,
|
|
"backend_received_unix_ns": record.get("backend_received_unix_ns"),
|
|
"frame_hash": record.get("frame_hash"),
|
|
"estimate_method": "capture_to_send+srtt/2+request_to_paint" if video_e2e_est_ms is not None else "capture_to_send+srtt/2",
|
|
"video_network_oneway_est_ms": network_oneway,
|
|
"b_side_capture_to_send_ms": capture_to_send,
|
|
"request_to_paint_ms": request_to_paint_ms,
|
|
"response_to_paint_ms": probe.get("response_to_paint_ms") if probe is not None else None,
|
|
"backend_to_request_ms": probe.get("backend_to_request_ms") if probe is not None else None,
|
|
"backend_to_request_ms_raw": probe.get("backend_to_request_ms_raw") if probe is not None else None,
|
|
"backend_to_paint_ms": probe.get("backend_to_paint_ms") if probe is not None else None,
|
|
"backend_to_paint_ms_raw": probe.get("backend_to_paint_ms_raw") if probe is not None else None,
|
|
"browser_backend_clock_offset_ms": probe.get("browser_backend_clock_offset_ms") if probe is not None else None,
|
|
"browser_backend_clock_rtt_ms": probe.get("browser_backend_clock_rtt_ms") if probe is not None else None,
|
|
"video_partial_est_ms": partial_est,
|
|
"video_e2e_est_ms": video_e2e_est_ms,
|
|
"sequence_gap": record.get("sequence_gap"),
|
|
"repeat_flag": record.get("repeat_flag"),
|
|
"sender_clock_delta_ms_raw": record.get("sender_clock_delta_ms_raw"),
|
|
}
|
|
)
|
|
return estimates
|
|
|
|
|
|
def write_html_summary(
|
|
path: Path,
|
|
*,
|
|
net_epochs: list[dict[str, Any]],
|
|
control_estimates: list[dict[str, Any]],
|
|
video_estimates: list[dict[str, Any]],
|
|
) -> None:
|
|
latest_control = control_estimates[-1] if control_estimates else {}
|
|
latest_video = video_estimates[-1] if video_estimates else {}
|
|
latest_net = net_epochs[-1] if net_epochs else {}
|
|
html_text = f"""<!doctype html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<title>Latency Estimates</title>
|
|
<style>
|
|
body {{ font-family: Arial, sans-serif; margin: 24px; background: #0b1020; color: #eef2ff; }}
|
|
.grid {{ display: grid; grid-template-columns: repeat(3, minmax(0, 1fr)); gap: 16px; }}
|
|
.card {{ border: 1px solid #334155; border-radius: 8px; padding: 16px; background: #111827; }}
|
|
h1, h2 {{ margin-top: 0; }}
|
|
p {{ margin: 6px 0; line-height: 1.5; }}
|
|
code {{ color: #93c5fd; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h1>Latency Estimates</h1>
|
|
<div class="grid">
|
|
<section class="card">
|
|
<h2>Control</h2>
|
|
<p><strong>loop RTT:</strong> {html.escape(str(latest_control.get("control_loop_rtt_ms")))}</p>
|
|
<p><strong>to persist:</strong> {html.escape(str(latest_control.get("control_to_persist_est_ms")))}</p>
|
|
<p><strong>method:</strong> {html.escape(str(latest_control.get("estimate_method")))}</p>
|
|
<p><strong>samples:</strong> {len(control_estimates)}</p>
|
|
</section>
|
|
<section class="card">
|
|
<h2>Video</h2>
|
|
<p><strong>network one-way:</strong> {html.escape(str(latest_video.get("video_network_oneway_est_ms")))}</p>
|
|
<p><strong>partial:</strong> {html.escape(str(latest_video.get("video_partial_est_ms")))}</p>
|
|
<p><strong>end-to-end:</strong> {html.escape(str(latest_video.get("video_e2e_est_ms")))}</p>
|
|
<p><strong>samples:</strong> {len(video_estimates)}</p>
|
|
</section>
|
|
<section class="card">
|
|
<h2>Net Epoch</h2>
|
|
<p><strong>a→d control srtt:</strong> {html.escape(str(latest_net.get("a_to_d_control_srtt_ms")))}</p>
|
|
<p><strong>d→b control srtt:</strong> {html.escape(str(latest_net.get("d_to_b_control_srtt_ms")))}</p>
|
|
<p><strong>a→d video srtt:</strong> {html.escape(str(latest_net.get("a_to_d_video_srtt_ms")))}</p>
|
|
<p><strong>d→b video srtt:</strong> {html.escape(str(latest_net.get("d_to_b_video_srtt_ms")))}</p>
|
|
</section>
|
|
</div>
|
|
</body>
|
|
</html>
|
|
"""
|
|
path.write_text(html_text, encoding="utf-8")
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
run_dir = Path(args.run_dir).resolve()
|
|
output_dir = Path(args.output_dir).resolve() if args.output_dir else run_dir
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
network_samples = load_glob_jsonl(run_dir, "a-network-summary.*.jsonl")
|
|
control_events = load_glob_jsonl(run_dir, "a-control-events.*.jsonl")
|
|
control_acks = load_glob_jsonl(run_dir, "a-control-acks.*.jsonl")
|
|
frame_recv_records = load_glob_jsonl(run_dir, "a-video-frame-recv.*.jsonl")
|
|
display_probe_records = load_glob_jsonl(run_dir, "a-video-display-probe.*.jsonl")
|
|
|
|
net_epochs = flatten_net_epoch(network_samples)
|
|
control_estimates = aggregate_control_estimates(network_samples, control_events, control_acks)
|
|
video_estimates = aggregate_video_estimates(network_samples, frame_recv_records, display_probe_records)
|
|
|
|
write_jsonl(output_dir / "net-epoch-summary.jsonl", net_epochs)
|
|
write_jsonl(output_dir / "control-latency-estimates.jsonl", control_estimates)
|
|
write_jsonl(output_dir / "video-latency-estimates.jsonl", video_estimates)
|
|
write_html_summary(
|
|
output_dir / "latency-estimates.html",
|
|
net_epochs=net_epochs,
|
|
control_estimates=control_estimates,
|
|
video_estimates=video_estimates,
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|