diff --git a/.gitignore b/.gitignore index f2503a86..911f72b8 100644 --- a/.gitignore +++ b/.gitignore @@ -60,6 +60,7 @@ pixelpilot_config.h .vscode/ build/ build_sim/ +build-test/ .apt_cache/ output/ debian-*-generic-arm64.tar.xz diff --git a/CMakeLists.txt b/CMakeLists.txt index 2a8cdaa2..45a026f7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -158,6 +158,8 @@ set(LIB_SOURCE_FILES src/WiFiRSSIMonitor.hpp src/WiFiRSSIMonitor.cpp src/scheduling_helper.hpp + src/latency_probe.hpp + src/latency_probe.cpp src/gstrtpreceiver.cpp src/gstrtpreceiver.h) set(SOURCE_FILES @@ -249,6 +251,8 @@ if(BUILD_TESTS) # Test source files set(TEST_SOURCES tests/test_osd.cpp + tests/test_latency_probe.cpp + tests/test_latency_probe_integration.cpp src/main.h src/main.cpp ) diff --git a/config_osd.json b/config_osd.json index 3af98728..c4226e14 100644 --- a/config_osd.json +++ b/config_osd.json @@ -199,5 +199,10 @@ "height": 0, "facts": [] } - ] + ], + "latency_probe": { + "enable": false, + "host": "10.5.0.10", + "port": 5602 + } } diff --git a/src/gstrtpreceiver.cpp b/src/gstrtpreceiver.cpp index 63bfe544..77391425 100644 --- a/src/gstrtpreceiver.cpp +++ b/src/gstrtpreceiver.cpp @@ -4,6 +4,7 @@ // #include "gstrtpreceiver.h" +#include "latency_probe.hpp" #include "gst/gstparse.h" #include "gst/gstpipeline.h" #include "gst/net/gstnetaddressmeta.h" @@ -783,22 +784,32 @@ namespace { } static GstPadProbeReturn udp_last_hop_probe(GstPad*, GstPadProbeInfo* info, gpointer) { - if (!g_idr_enabled.load(std::memory_order_relaxed)) { + if (!(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER)) return GST_PAD_PROBE_OK; + GstBuffer* buf = GST_PAD_PROBE_INFO_BUFFER(info); + if (!buf) return GST_PAD_PROBE_OK; + + // Latency-probe hook runs first and is cheap when active==false. + if (latency_probe::active.load(std::memory_order_acquire)) { + GstMapInfo map; + if (gst_buffer_map(buf, &map, GST_MAP_READ)) { + latency_probe::on_rtp_buffer(map.data, map.size, + latency_probe::now_us()); + gst_buffer_unmap(buf, &map); + } } - if (GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER) { - GstBuffer* buf = GST_PAD_PROBE_INFO_BUFFER(info); - if (buf) { - on_incoming_stream_buffer(buf, "udpsrc"); - maybe_track_rtp_sequence(buf); - } + // Existing IDR-tracking path. + if (g_idr_enabled.load(std::memory_order_relaxed)) { + on_incoming_stream_buffer(buf, "udpsrc"); + maybe_track_rtp_sequence(buf); } return GST_PAD_PROBE_OK; } static void attach_last_hop_probes(GstElement* pipeline) { - if (!g_idr_enabled.load(std::memory_order_relaxed)) { + if (!g_idr_enabled.load(std::memory_order_relaxed) && + !latency_probe::active.load(std::memory_order_acquire)) { return; } diff --git a/src/latency_probe.cpp b/src/latency_probe.cpp new file mode 100644 index 00000000..4c1ced19 --- /dev/null +++ b/src/latency_probe.cpp @@ -0,0 +1,535 @@ +#include "latency_probe.hpp" +#include "latency_probe_wire.hpp" +#include "osd.h" +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace latency_probe { + +std::atomic active{false}; + +namespace { + +PublishUintFn g_pub_u_override; +PublishIntFn g_pub_i_override; + +// Module-internal state. There is one global instance protected by g_start_mu; +// the thread itself does not need additional locking against g_state — only +// the application driver calls start()/stop(). +struct ProbeState { + std::thread thread; + std::atomic stop_flag{false}; + int fd = -1; + sockaddr_in peer{}; + ClockOffset clock; + FrameMatcher matcher; + uint64_t wire_clamp_count = 0; + uint64_t started_us = 0; +}; + +std::mutex g_start_mu; +ProbeState* g_state = nullptr; + +// Latest "decode_and_handover" reading from the existing display-thread +// fact, fed in via record_gs_pipeline_ms(). Read by the publish callback +// at frame-publish time. Relaxed ordering is fine — the value is a smooth +// per-frame number; transient staleness across the GS pipeline pairing is +// acceptable. +std::atomic g_gs_pipeline_ms{0}; + +void publish_uint_real(const char* name, uint64_t value) { + osd_publish_uint_fact(name, NULL, 0, static_cast(value)); +} +void publish_int_real(const char* name, int64_t value) { + osd_publish_int_fact(name, NULL, 0, static_cast(value)); +} + +void send_subscribe(ProbeState& s) { + uint8_t buf[wire::kSizeSubscribe]; + encode_subscribe(buf); + sendto(s.fd, buf, sizeof(buf), 0, + reinterpret_cast(&s.peer), sizeof(s.peer)); +} + +void send_sync_req(ProbeState& s) { + uint8_t buf[wire::kSizeSyncReq]; + uint64_t t1 = now_us(); + encode_sync_req(buf, t1); + sendto(s.fd, buf, sizeof(buf), 0, + reinterpret_cast(&s.peer), sizeof(s.peer)); +} + +void handle_packet(ProbeState& s, const uint8_t* buf, size_t len) { + SyncRespFields sr{}; + MsgFrameFields mf{}; + uint8_t kind = decode_message(buf, len, sr, mf); + switch (kind) { + case wire::kMsgSyncResp: { + uint64_t t4 = now_us(); + s.clock.add_sample(sr.t1_us, sr.t2_us, sr.t3_us, t4); + break; + } + case wire::kMsgFrame: { + s.matcher.on_msg_frame(mf.ssrc, mf.rtp_timestamp, + mf.capture_us, mf.frame_ready_us, + mf.last_pkt_send_us, + /*now=*/now_us()); + break; + } + default: break; + } +} + +void probe_thread_main(ProbeState* sp) { + ProbeState& s = *sp; + constexpr uint64_t kSubInterval_us = 2'000'000; + constexpr uint64_t kSyncFast_us = 1'000'000; + constexpr uint64_t kSyncSlow_us = 5'000'000; + constexpr uint64_t kFastWindow_us = 10'000'000; + constexpr uint64_t kTtlSweepStep_us = 100'000; + constexpr uint64_t kTtl_us = 500'000; + + s.started_us = now_us(); + uint64_t next_subscribe = s.started_us; + uint64_t next_sync = s.started_us; + uint64_t next_sweep = s.started_us + kTtlSweepStep_us; + + // Wire publish callback. The matcher holds its own mutex while calling + // back, so this runs under that mutex — keep it cheap. + s.matcher.set_publish_callback( + [&](const FrameTimings& f) { + int64_t off; uint64_t rtt; + s.clock.get(off, rtt); + uint64_t gs_pipe = g_gs_pipeline_ms.load(std::memory_order_relaxed); + PublishUintFn pu = g_pub_u_override ? g_pub_u_override : publish_uint_real; + PublishIntFn pi = g_pub_i_override ? g_pub_i_override : publish_int_real; + compute_and_publish(f, off, rtt, gs_pipe, s.wire_clamp_count, pu, pi); + }); + + pollfd pfd{ s.fd, POLLIN, 0 }; + + while (!s.stop_flag.load(std::memory_order_relaxed)) { + uint64_t now = now_us(); + + if (now >= next_subscribe) { + send_subscribe(s); + next_subscribe = now + kSubInterval_us; + } + if (now >= next_sync) { + send_sync_req(s); + uint64_t step = (now - s.started_us < kFastWindow_us) + ? kSyncFast_us : kSyncSlow_us; + next_sync = now + step; + } + if (now >= next_sweep) { + s.matcher.ttl_sweep(now, kTtl_us); + next_sweep = now + kTtlSweepStep_us; + } + + uint64_t soonest = std::min({next_subscribe, next_sync, next_sweep}); + int timeout_ms = (soonest > now) ? static_cast((soonest - now) / 1000ull) : 0; + if (timeout_ms < 1) timeout_ms = 1; + if (timeout_ms > 100) timeout_ms = 100; + + int prc = poll(&pfd, 1, timeout_ms); + if (prc > 0 && (pfd.revents & POLLIN)) { + for (int i = 0; i < 8; ++i) { + uint8_t buf[128]; + ssize_t n = recv(s.fd, buf, sizeof(buf), MSG_DONTWAIT); + if (n <= 0) break; + handle_packet(s, buf, static_cast(n)); + } + } + } +} + +bool resolve_peer(const std::string& host, uint16_t port, sockaddr_in& out) { + memset(&out, 0, sizeof(out)); + out.sin_family = AF_INET; + out.sin_port = htons(port); + if (inet_pton(AF_INET, host.c_str(), &out.sin_addr) == 1) return true; + addrinfo hints{}, *res = nullptr; + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_DGRAM; + if (getaddrinfo(host.c_str(), nullptr, &hints, &res) != 0 || !res) return false; + auto* a = reinterpret_cast(res->ai_addr); + out.sin_addr = a->sin_addr; + freeaddrinfo(res); + return true; +} + +} // namespace + +bool start(const std::string& host, uint16_t port) { + std::lock_guard lk(g_start_mu); + if (g_state) return true; + if (port == 0 || host.empty()) { + spdlog::warn("[latency-probe] disabled: empty host/port"); + return false; + } + + auto s = std::make_unique(); + if (!resolve_peer(host, port, s->peer)) { + spdlog::warn("[latency-probe] failed to resolve {}:{}", host, port); + return false; + } + + s->fd = socket(AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0); + if (s->fd < 0) { + spdlog::warn("[latency-probe] socket: {}", strerror(errno)); + return false; + } + int flags = fcntl(s->fd, F_GETFL, 0); + if (flags < 0 || fcntl(s->fd, F_SETFL, flags | O_NONBLOCK) < 0) { + spdlog::warn("[latency-probe] fcntl O_NONBLOCK: {}", strerror(errno)); + close(s->fd); + return false; + } + + g_state = s.release(); + g_state->thread = std::thread(probe_thread_main, g_state); + active.store(true, std::memory_order_release); + spdlog::info("[latency-probe] started → {}:{}", host, port); + return true; +} + +void stop() { + ProbeState* s = nullptr; + { + std::lock_guard lk(g_start_mu); + s = g_state; + g_state = nullptr; + } + if (!s) return; + active.store(false, std::memory_order_release); + s->stop_flag.store(true, std::memory_order_release); + if (s->thread.joinable()) s->thread.join(); + if (s->fd >= 0) close(s->fd); + delete s; +} + +// g_state is read without a lock in the hot-path hooks. The `active` atomic +// is the gate: it's flipped to `true` AFTER g_state is fully initialized +// (release store at end of start()), and flipped to `false` BEFORE g_state +// is torn down (release store at the top of stop()). The hot-path acquire +// load synchronizes-with that release, making the prior write to g_state +// visible without a lock. +void on_rtp_buffer(const uint8_t* data, size_t len, uint64_t gs_recv_us) { + if (!active.load(std::memory_order_acquire)) return; + ProbeState* s = g_state; + if (!s) return; + RtpHeaderInfo h; + if (!parse_rtp_header(data, len, h)) return; + if (!h.marker) return; + s->matcher.on_marker_arrival(h.ssrc, h.timestamp, gs_recv_us, gs_recv_us); +} + +void record_gs_pipeline_ms(uint64_t gs_pipeline_ms) { + if (!active.load(std::memory_order_acquire)) return; + g_gs_pipeline_ms.store(gs_pipeline_ms, std::memory_order_relaxed); +} + +bool parse_rtp_header(const uint8_t* data, size_t len, RtpHeaderInfo& info) { + if (!data || len < 12) return false; + if ((data[0] >> 6) != 2) return false; // version must be 2 + + info.marker = (data[1] & 0x80) != 0; + info.timestamp = (static_cast(data[4]) << 24) | + (static_cast(data[5]) << 16) | + (static_cast(data[6]) << 8) | + (static_cast(data[7])); + info.ssrc = (static_cast(data[8]) << 24) | + (static_cast(data[9]) << 16) | + (static_cast(data[10]) << 8) | + (static_cast(data[11])); + return true; +} + +uint64_t now_us() { + struct timespec ts{}; + clock_gettime(CLOCK_MONOTONIC, &ts); + return static_cast(ts.tv_sec) * 1'000'000ull + + static_cast(ts.tv_nsec) / 1'000ull; +} + +void ClockOffset::add_sample(uint64_t t1, uint64_t t2, uint64_t t3, uint64_t t4) { + // rtt_us = (t4 - t1) - (t3 - t2). Both subtractions can be negative + // in pathological samples (out-of-order delivery, clock jumps). In + // that case we treat rtt as huge so this sample never wins. + uint64_t rtt; + if (t4 >= t1 && t3 >= t2 && (t4 - t1) >= (t3 - t2)) + rtt = (t4 - t1) - (t3 - t2); + else + rtt = std::numeric_limits::max(); + + // offset_us = ((t2 - t1) + (t3 - t4)) / 2, signed arithmetic. + int64_t off = ((static_cast(t2) - static_cast(t1)) + + (static_cast(t3) - static_cast(t4))) / 2; + + std::lock_guard lk(m_); + Sample& slot = samples_[next_]; + bool overwriting_best = + have_best_ && slot.valid && + slot.rtt_us == best_rtt_us_ && slot.offset_us == best_offset_us_; + + slot.valid = true; + slot.offset_us = off; + slot.rtt_us = rtt; + slot.taken_us = 0; // not used; kept for future drift handling + next_ = (next_ + 1) % kRing; + + if (!have_best_ || rtt < best_rtt_us_) { + best_rtt_us_ = rtt; + best_offset_us_ = off; + have_best_ = true; + return; + } + if (overwriting_best) { + // Re-scan the ring to pick the new minimum. + uint64_t new_best_rtt = std::numeric_limits::max(); + int64_t new_best_off = 0; + bool any = false; + for (const auto& s : samples_) { + if (s.valid && s.rtt_us < new_best_rtt) { + new_best_rtt = s.rtt_us; + new_best_off = s.offset_us; + any = true; + } + } + have_best_ = any; + best_rtt_us_ = any ? new_best_rtt : 0; + best_offset_us_ = any ? new_best_off : 0; + } +} + +void ClockOffset::get(int64_t& offset_us, uint64_t& rtt_us) const { + std::lock_guard lk(m_); + if (!have_best_) { offset_us = 0; rtt_us = 0; return; } + offset_us = best_offset_us_; + rtt_us = best_rtt_us_; +} + +static bool is_complete(const FrameTimings& f) { + return f.sidecar_seen && f.gs_recv_last_us != 0; +} + +void FrameMatcher::set_publish_callback(PublishFn cb) { + std::lock_guard lk(m_); + publish_ = std::move(cb); +} + +size_t FrameMatcher::size() const { + std::lock_guard lk(m_); + return ring_.size(); +} + +FrameTimings* FrameMatcher::find_by_key_locked(uint32_t ssrc, uint32_t rtp_ts) { + for (auto& f : ring_) { + if (f.ssrc == ssrc && f.rtp_ts == rtp_ts) return &f; + } + return nullptr; +} + +FrameTimings& FrameMatcher::push_new_locked(uint64_t now) { + if (ring_.size() >= kRingCap) ring_.pop_front(); + ring_.emplace_back(); + ring_.back().inserted_us = now; + return ring_.back(); +} + +void FrameMatcher::try_publish_for_locked(FrameTimings& slot) { + if (!is_complete(slot)) return; + if (publish_) publish_(slot); + for (auto it = ring_.begin(); it != ring_.end(); ++it) { + if (&(*it) == &slot) { ring_.erase(it); return; } + } +} + +void FrameMatcher::on_marker_arrival(uint32_t ssrc, uint32_t rtp_ts, + uint64_t gs_recv_us, uint64_t now) { + std::lock_guard lk(m_); + FrameTimings* slot = find_by_key_locked(ssrc, rtp_ts); + if (slot) { + if (slot->gs_recv_last_us == 0) slot->gs_recv_last_us = gs_recv_us; + } else { + slot = &push_new_locked(now); + slot->ssrc = ssrc; + slot->rtp_ts = rtp_ts; + slot->gs_recv_last_us = gs_recv_us; + } + try_publish_for_locked(*slot); +} + +void FrameMatcher::on_msg_frame(uint32_t ssrc, uint32_t rtp_ts, + uint64_t capture_us, uint64_t frame_ready_us, + uint64_t last_pkt_send_us, uint64_t now) { + std::lock_guard lk(m_); + FrameTimings* slot = find_by_key_locked(ssrc, rtp_ts); + if (!slot) { + slot = &push_new_locked(now); + slot->ssrc = ssrc; + slot->rtp_ts = rtp_ts; + } + slot->capture_us = capture_us; + slot->frame_ready_us = frame_ready_us; + slot->last_pkt_send_us = last_pkt_send_us; + slot->sidecar_seen = true; + try_publish_for_locked(*slot); +} + +void FrameMatcher::ttl_sweep(uint64_t now, uint64_t ttl_us) { + std::lock_guard lk(m_); + while (!ring_.empty() && now - ring_.front().inserted_us > ttl_us) { + ring_.pop_front(); + } +} + +void compute_and_publish(const FrameTimings& f, + int64_t offset_us, + uint64_t rtt_us, + uint64_t gs_pipeline_ms, + uint64_t& wire_clamp_counter, + PublishUintFn pub_u, + PublishIntFn pub_i) { + bool have_capture = f.capture_us != 0; + // rtt_us == 0 is ClockOffset::get()'s sentinel for "no sync sample yet". + // Without an offset, gs_recv_last_us (GS CLOCK_MONOTONIC) minus + // last_pkt_send_us (drone CLOCK_MONOTONIC) is (GS_uptime − drone_uptime) + // — two independent boot clocks, often off by hours. Publishing wire_ms + // before sync causes the field-observed "absurdly high latency at startup + // then settles to ~50 ms" symptom; suppress until first MSG_SYNC_RESP. + bool have_sync = rtt_us != 0; + + uint64_t cap_to_enc_us = 0; + uint64_t cap_to_enc_ms = 0; + uint64_t enc_to_send_us = 0; + uint64_t enc_to_send_ms = 0; + if (have_capture && f.frame_ready_us >= f.capture_us) { + cap_to_enc_us = f.frame_ready_us - f.capture_us; + cap_to_enc_ms = cap_to_enc_us / 1000ull; + } + if (f.last_pkt_send_us >= f.frame_ready_us) { + enc_to_send_us = f.last_pkt_send_us - f.frame_ready_us; + enc_to_send_ms = enc_to_send_us / 1000ull; + } + + if (have_capture) { + pub_u("video.latency.capture_to_encode_ms", cap_to_enc_ms); + pub_u("video.latency.capture_to_encode_us", cap_to_enc_us); + } + pub_u("video.latency.encode_to_send_ms", enc_to_send_ms); + pub_u("video.latency.encode_to_send_us", enc_to_send_us); + + if (have_sync) { + int64_t adjusted_send_us = + static_cast(f.last_pkt_send_us) - offset_us; + int64_t wire_us = static_cast(f.gs_recv_last_us) - adjusted_send_us; + if (wire_us < 0) { + wire_us = 0; + wire_clamp_counter++; + } + uint64_t wire_ms = static_cast(wire_us) / 1000ull; + pub_u("video.latency.wire_ms", wire_ms); + pub_u("video.latency.total_ms", + cap_to_enc_ms + enc_to_send_ms + wire_ms + gs_pipeline_ms); + } + + pub_i("video.latency.clock_offset_us", offset_us); + pub_u("video.latency.clock_rtt_us", rtt_us); + pub_u("video.latency.wire_clamp_count", wire_clamp_counter); +} + +void set_publish_overrides_for_test(PublishUintFn pub_u, PublishIntFn pub_i) { + g_pub_u_override = std::move(pub_u); + g_pub_i_override = std::move(pub_i); +} + +namespace { + +inline void write_be32(uint8_t* p, uint32_t v) { + p[0] = (v >> 24) & 0xff; + p[1] = (v >> 16) & 0xff; + p[2] = (v >> 8) & 0xff; + p[3] = (v) & 0xff; +} + +inline void write_be64(uint8_t* p, uint64_t v) { + for (int i = 0; i < 8; ++i) p[i] = (v >> (56 - 8 * i)) & 0xff; +} + +inline uint32_t read_be32(const uint8_t* p) { + return (static_cast(p[0]) << 24) | + (static_cast(p[1]) << 16) | + (static_cast(p[2]) << 8) | + (static_cast(p[3])); +} + +inline uint64_t read_be64(const uint8_t* p) { + uint64_t v = 0; + for (int i = 0; i < 8; ++i) + v = (v << 8) | static_cast(p[i]); + return v; +} + +} // namespace + +void encode_subscribe(uint8_t* out) { + write_be32(out, wire::kMagic); + out[4] = wire::kVersion; + out[5] = wire::kMsgSubscribe; + out[6] = 0; + out[7] = 0; +} + +void encode_sync_req(uint8_t* out, uint64_t t1_us) { + write_be32(out, wire::kMagic); + out[4] = wire::kVersion; + out[5] = wire::kMsgSyncReq; + out[6] = 0; + out[7] = 0; + write_be64(out + 8, t1_us); +} + +uint8_t decode_message(const uint8_t* in, size_t len, + SyncRespFields& sr, MsgFrameFields& mf) { + if (!in || len < 6) return 0; + if (read_be32(in) != wire::kMagic) return 0; + if (in[4] != wire::kVersion) return 0; + + uint8_t msg = in[5]; + switch (msg) { + case wire::kMsgSyncResp: + if (len < wire::kSizeSyncResp) return 0; + sr.t1_us = read_be64(in + 8); + sr.t2_us = read_be64(in + 16); + sr.t3_us = read_be64(in + 24); + return msg; + + case wire::kMsgFrame: + if (len < wire::kSizeFrame) return 0; + mf.ssrc = read_be32(in + 8); + mf.rtp_timestamp = read_be32(in + 12); + mf.frame_ready_us = read_be64(in + 24); + mf.capture_us = read_be64(in + 36); + mf.last_pkt_send_us = read_be64(in + 44); + return msg; + + default: + return 0; + } +} + +} // namespace latency_probe diff --git a/src/latency_probe.hpp b/src/latency_probe.hpp new file mode 100644 index 00000000..68695e4e --- /dev/null +++ b/src/latency_probe.hpp @@ -0,0 +1,180 @@ +#ifndef LATENCY_PROBE_HPP +#define LATENCY_PROBE_HPP + +#include +#include +#include +#include // size_t +#include +#include +#include +#include +#include +#include + +namespace latency_probe { + +struct RtpHeaderInfo { + uint32_t ssrc; + uint32_t timestamp; + bool marker; +}; + +// Parses the fixed 12-byte RTP header (no CSRC, no extension). +// Returns true and fills info on success, false on short buffer or +// version != 2. Does NOT validate CSRC count or extension — we only +// need ssrc/timestamp/marker which all sit in the fixed prefix. +bool parse_rtp_header(const uint8_t* data, size_t len, RtpHeaderInfo& info); + +// Cheap-to-check gate for hot paths. Loaded with acquire memory order; +// changes only at start()/stop() which the application drives at startup +// and shutdown. +extern std::atomic active; + +// Lifecycle. host is the drone's IP, port is the sidecar UDP port +// (typical default 5602). Returns true on success. +// Safe to call when already started (no-op). Safe to call with active==false. +bool start(const std::string& host, uint16_t port); +void stop(); + +// Hot-path hook: called from the GStreamer pad probe for every RTP +// packet received on the video udpsrc. data points at the raw RTP packet +// (header + payload); len is the full packet length. Implementation +// parses the header and acts only on packets with marker=1. +// No-op when active==false. +void on_rtp_buffer(const uint8_t* data, size_t len, uint64_t gs_recv_us); + +// Hot-path hook: called from the display thread after a real video-frame +// commit (NOT on OSD-only redraws), passing the value already computed +// for the existing `video.decode_and_handover_ms` fact. Stored as the +// "GS pipeline" component of the per-frame total. +// No-op when active==false. +void record_gs_pipeline_ms(uint64_t gs_pipeline_ms); + +// Returns a CLOCK_MONOTONIC sample in microseconds. Convenience for callers. +uint64_t now_us(); + +class ClockOffset { +public: + // t1: probe send time (GS clock, us) + // t2: drone recv time (drone clock, us) + // t3: drone reply send time (drone clock, us) + // t4: probe recv time (GS clock, us) + void add_sample(uint64_t t1, uint64_t t2, uint64_t t3, uint64_t t4); + + // Out-params for current best offset/rtt. Returns 0/0 until at + // least one sample is recorded. + // Convention: GS_clock + offset_us == drone_clock. + void get(int64_t& offset_us, uint64_t& rtt_us) const; + +private: + struct Sample { + bool valid; + int64_t offset_us; + uint64_t rtt_us; + uint64_t taken_us; // wall-monotonic timestamp of insertion + }; + static constexpr size_t kRing = 16; + + mutable std::mutex m_; + std::array samples_{}; + size_t next_ = 0; + int64_t best_offset_us_ = 0; + uint64_t best_rtt_us_ = 0; + bool have_best_ = false; +}; + +struct FrameTimings { + uint32_t ssrc = 0; + uint32_t rtp_ts = 0; + uint64_t gs_recv_last_us = 0; // pad probe (marker=1) + uint64_t capture_us = 0; // from MSG_FRAME (drone clock) + uint64_t frame_ready_us = 0; // from MSG_FRAME (drone clock) + uint64_t last_pkt_send_us = 0; // from MSG_FRAME (drone clock) + bool sidecar_seen = false; + uint64_t inserted_us = 0; // for TTL +}; + +// Pairs RTP marker-bit arrivals (from the pad probe) with MSG_FRAME +// (from the sidecar) by (ssrc, rtp_timestamp). The two halves can arrive +// in either order. A slot publishes when both are present. Older orphans +// are aged out by ttl_sweep. +class FrameMatcher { +public: + using PublishFn = std::function; + static constexpr size_t kRingCap = 64; + + void set_publish_callback(PublishFn cb); + + // All entry points take 'now' separately so tests can drive time + // deterministically. In production, callers pass latency_probe::now_us(). + void on_marker_arrival(uint32_t ssrc, uint32_t rtp_ts, + uint64_t gs_recv_us, uint64_t now); + void on_msg_frame(uint32_t ssrc, uint32_t rtp_ts, + uint64_t capture_us, uint64_t frame_ready_us, + uint64_t last_pkt_send_us, uint64_t now); + + // Evict slots older than ttl_us. + void ttl_sweep(uint64_t now, uint64_t ttl_us); + + size_t size() const; + +private: + mutable std::mutex m_; + std::deque ring_; + PublishFn publish_; + + // Caller holds m_. + FrameTimings* find_by_key_locked(uint32_t ssrc, uint32_t rtp_ts); + FrameTimings& push_new_locked(uint64_t now); + void try_publish_for_locked(FrameTimings& slot); // publishes + erases + // the given slot if + // complete. +}; + +// Test scaffolding: captured-facts vector used by tests in place of +// the real osd_publish_*_fact calls. +struct PublishedFacts { + std::vector> uint_facts; + std::vector> int_facts; +}; + +using PublishUintFn = std::function; +using PublishIntFn = std::function; + +// Test-only: override the OSD publish path with custom callbacks. Pass +// nullptr/nullptr to revert to the real osd_publish_*_fact functions. +// Has effect only on subsequent publishes. +void set_publish_overrides_for_test(PublishUintFn pub_u, PublishIntFn pub_i); + +// Compute all video.latency.* values from one matched FrameTimings + the +// current clock offset/rtt + the latest GS-pipeline ms (from the existing +// `video.decode_and_handover_ms` fact, fed via record_gs_pipeline_ms), and +// emit via the provided callbacks. Mutates wire_clamp_counter if the +// computed wire delta would have been negative. +// +// capture_us == 0 means the drone did not provide a PTS-derived capture +// time; in that case capture_to_encode_ms / capture_to_encode_us are +// suppressed. +// +// rtt_us == 0 is the "no sync yet" sentinel from ClockOffset::get(). In +// that state wire_ms and total_ms are suppressed (cross-clock arithmetic +// would be meaningless) and wire_clamp_counter is not incremented. The +// drone-local segments and the clock diagnostic facts (clock_offset_us, +// clock_rtt_us=0, wire_clamp_count) still publish so the OSD can see that +// sync hasn't converged. +// +// total_ms = capture_to_encode_ms + encode_to_send_ms + wire_ms + gs_pipeline_ms. +// Published only when sync is available (any other missing summand +// contributes 0). +void compute_and_publish(const FrameTimings& f, + int64_t offset_us, + uint64_t rtt_us, + uint64_t gs_pipeline_ms, + uint64_t& wire_clamp_counter, + PublishUintFn pub_u, + PublishIntFn pub_i); + +} // namespace latency_probe + +#endif // LATENCY_PROBE_HPP diff --git a/src/latency_probe_wire.hpp b/src/latency_probe_wire.hpp new file mode 100644 index 00000000..41ec2417 --- /dev/null +++ b/src/latency_probe_wire.hpp @@ -0,0 +1,63 @@ +#ifndef LATENCY_PROBE_WIRE_HPP +#define LATENCY_PROBE_WIRE_HPP + +#include +#include + +namespace latency_probe { + +// Mirrors waybeam_venc/include/rtp_sidecar.h. Keep in sync when the +// drone protocol bumps RTP_SIDECAR_VERSION. +namespace wire { + +constexpr uint32_t kMagic = 0x52545053u; // "RTPS" +constexpr uint8_t kVersion = 1; + +enum : uint8_t { + kMsgSubscribe = 1, + kMsgFrame = 2, + kMsgSyncReq = 3, + kMsgSyncResp = 4, +}; + +constexpr uint8_t kFlagKeyframe = 0x01; +constexpr uint8_t kFlagEncInfo = 0x02; +constexpr uint8_t kFlagTransportInfo = 0x04; + +// All sizes match the packed structs on the wire (network byte order). +constexpr size_t kSizeSubscribe = 8; +constexpr size_t kSizeSyncReq = 16; +constexpr size_t kSizeSyncResp = 32; +constexpr size_t kSizeFrame = 52; + +} // namespace wire + +struct SyncRespFields { + uint64_t t1_us; + uint64_t t2_us; + uint64_t t3_us; +}; + +struct MsgFrameFields { + uint32_t ssrc; + uint32_t rtp_timestamp; + uint64_t frame_ready_us; + uint64_t capture_us; + uint64_t last_pkt_send_us; +}; + +// Serialise SUBSCRIBE into out[0..kSizeSubscribe). +void encode_subscribe(uint8_t* out); + +// Serialise SYNC_REQ with the given t1 (GS clock us) into out[0..kSizeSyncReq). +void encode_sync_req(uint8_t* out, uint64_t t1_us); + +// Parse incoming UDP datagram. Returns the message type on success, or 0 +// on any validation failure. Fills the appropriate out param. +uint8_t decode_message(const uint8_t* in, size_t len, + SyncRespFields& sr_out, + MsgFrameFields& mf_out); + +} // namespace latency_probe + +#endif // LATENCY_PROBE_WIRE_HPP diff --git a/src/main.cpp b/src/main.cpp index 03a07ce2..4055bfda 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -59,6 +59,7 @@ extern "C" { #include "pixelpilot_config.h" #include #include "WiFiRSSIMonitor.hpp" +#include "latency_probe.hpp" #include "gsmenu/gs_system.h" #include "gsmenu/air_actions.h" #include "gsmenu/gs_actions.h" @@ -451,6 +452,13 @@ void *__DISPLAY_THREAD__(void *param) osd_publish_uint_fact("video.displayed_frame", NULL, 0, 1); uint64_t decode_and_handover_display_ms=get_time_ms()-decoding_pts; osd_publish_uint_fact("video.decode_and_handover_ms", NULL, 0, decode_and_handover_display_ms); + // Forward the same number to latency_probe so it can roll the + // GS-pipeline component into video.latency.total_ms. Only on + // real video commits (fb_id != 0) — OSD-only commits would + // publish 0 here and dilute the rolling figure. + if (fb_id != 0) { + latency_probe::record_gs_pipeline_ms(decode_and_handover_display_ms); + } } end: spdlog::info("Display thread done."); @@ -1691,6 +1699,23 @@ int main(int argc, char **argv) } else { osd_config = {}; } + // Glass-to-glass latency probe (default disabled). + { + bool lp_enable = false; + std::string lp_host; + uint16_t lp_port = 5602; + try { + if (osd_config.contains("latency_probe")) { + const auto& lp = osd_config["latency_probe"]; + if (lp.contains("enable")) lp_enable = lp["enable"].get(); + if (lp.contains("host")) lp_host = lp["host"].get(); + if (lp.contains("port")) lp_port = static_cast(lp["port"].get()); + } + if (lp_enable) latency_probe::start(lp_host, lp_port); + } catch (const std::exception& e) { + spdlog::warn("[latency-probe] init error: {}", e.what()); + } + } if (mavlink_thread) { ret = pthread_create(&tid_mavlink, NULL, __MAVLINK_THREAD__, &signal_flag); assert(!ret); @@ -1809,6 +1834,7 @@ int main(int argc, char **argv) remove(pidFilePath.c_str()); restore_stdin(); + latency_probe::stop(); return return_value; } diff --git a/tests/osd_stub.cpp b/tests/osd_stub.cpp new file mode 100644 index 00000000..cb561edf --- /dev/null +++ b/tests/osd_stub.cpp @@ -0,0 +1,9 @@ +// No-op OSD pubsub implementations for the standalone latency_probe tests. +// The tests override the publish path via set_publish_overrides_for_test(), +// so these symbols only need to satisfy the linker. +#include "osd.h" + +extern "C" { +void osd_publish_uint_fact(char const*, osd_tag*, int, ulong) {} +void osd_publish_int_fact (char const*, osd_tag*, int, long) {} +} diff --git a/tests/run_latency_tests.sh b/tests/run_latency_tests.sh new file mode 100755 index 00000000..456154bd --- /dev/null +++ b/tests/run_latency_tests.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash +# Standalone test runner for the latency_probe unit + integration tests. +# Bypasses the full CMake build (which needs Rockchip MPP/RGA and DRM). +# Requires: g++ (C++17), Catch2 headers, libspdlog, libfmt on the include/link path. +set -euo pipefail + +HERE=$(cd "$(dirname "$0")" && pwd) +ROOT=$(cd "$HERE/.." && pwd) +OUT="$ROOT/build-test" +mkdir -p "$OUT" + +# -DUSE_SIMULATOR makes src/osd.h skip its drm.h include, so we can pull +# in osd.h without the full Rockchip toolchain. +g++ -std=c++17 -Wall -Wextra -O0 -g \ + -DUSE_SIMULATOR \ + -I"$ROOT/src" \ + -o "$OUT/latency_probe_tests" \ + "$HERE/standalone_main.cpp" \ + "$HERE/test_latency_probe.cpp" \ + "$HERE/test_latency_probe_integration.cpp" \ + "$HERE/osd_stub.cpp" \ + "$ROOT/src/latency_probe.cpp" \ + -lspdlog -lfmt -lpthread + +"$OUT/latency_probe_tests" "$@" diff --git a/tests/standalone_main.cpp b/tests/standalone_main.cpp new file mode 100644 index 00000000..bab27dec --- /dev/null +++ b/tests/standalone_main.cpp @@ -0,0 +1,5 @@ +// Standalone Catch2 v2 main for the latency_probe test runner. +// The CMake build uses Catch2WithMain; this file is only compiled into +// the bash-driven standalone runner under tests/run_latency_tests.sh. +#define CATCH_CONFIG_MAIN +#include diff --git a/tests/test_latency_probe.cpp b/tests/test_latency_probe.cpp new file mode 100644 index 00000000..1a15aa30 --- /dev/null +++ b/tests/test_latency_probe.cpp @@ -0,0 +1,476 @@ +#include +#include +#include + +#include "../src/latency_probe.hpp" +#include "../src/latency_probe_wire.hpp" + +namespace lp = latency_probe; + +namespace { +// Helper: build a synthetic RTP header with the given fields. +// Returns a 12-byte vector. +std::vector make_header(uint8_t version, bool marker, + uint32_t timestamp, uint32_t ssrc) { + std::vector b(12, 0); + b[0] = (version & 0x03) << 6; // V=version, P=0, X=0, CC=0 + b[1] = (marker ? 0x80 : 0x00) | 0x60; // M + PT=96 (dyn) + b[2] = 0; b[3] = 0; // seq + b[4] = (timestamp >> 24) & 0xff; + b[5] = (timestamp >> 16) & 0xff; + b[6] = (timestamp >> 8) & 0xff; + b[7] = (timestamp ) & 0xff; + b[8] = (ssrc >> 24) & 0xff; + b[9] = (ssrc >> 16) & 0xff; + b[10] = (ssrc >> 8) & 0xff; + b[11] = (ssrc ) & 0xff; + return b; +} +} + +TEST_CASE("RTP header parse: marker=1, ssrc, timestamp extracted", + "[latency_probe][rtp]") { + auto h = make_header(/*version=*/2, /*marker=*/true, + /*ts=*/0x11223344u, /*ssrc=*/0xDEADBEEFu); + + lp::RtpHeaderInfo info; + REQUIRE(lp::parse_rtp_header(h.data(), h.size(), info)); + REQUIRE(info.marker == true); + REQUIRE(info.timestamp == 0x11223344u); + REQUIRE(info.ssrc == 0xDEADBEEFu); +} + +TEST_CASE("RTP header parse: marker=0 still parses but flags off", + "[latency_probe][rtp]") { + auto h = make_header(2, false, 1u, 2u); + lp::RtpHeaderInfo info; + REQUIRE(lp::parse_rtp_header(h.data(), h.size(), info)); + REQUIRE(info.marker == false); + REQUIRE(info.timestamp == 1u); + REQUIRE(info.ssrc == 2u); +} + +TEST_CASE("RTP header parse: wrong version rejected", + "[latency_probe][rtp]") { + auto h = make_header(/*version=*/3, true, 1u, 2u); + lp::RtpHeaderInfo info; + REQUIRE_FALSE(lp::parse_rtp_header(h.data(), h.size(), info)); +} + +TEST_CASE("RTP header parse: non-zero CSRC count still parses (CC bits ignored)", + "[latency_probe][rtp]") { + // The parser only reads bytes 0..11. CSRCs sit AFTER byte 11, so they + // do not affect us — but we should prove that a CC>0 packet still + // parses correctly. Real RTP packets do sometimes carry CSRCs. + auto h = make_header(2, true, 0x11223344u, 0xDEADBEEFu); + h[0] |= 0x03; // CC = 3 (three CSRC entries would follow if we read further) + lp::RtpHeaderInfo info; + REQUIRE(lp::parse_rtp_header(h.data(), h.size(), info)); + REQUIRE(info.timestamp == 0x11223344u); + REQUIRE(info.ssrc == 0xDEADBEEFu); + REQUIRE(info.marker == true); +} + +TEST_CASE("RTP header parse: extension bit set still parses", + "[latency_probe][rtp]") { + // Extension header sits after CSRC list; the fixed 12-byte prefix + // is unaffected. + auto h = make_header(2, true, 0x11223344u, 0xDEADBEEFu); + h[0] |= 0x10; // X bit + lp::RtpHeaderInfo info; + REQUIRE(lp::parse_rtp_header(h.data(), h.size(), info)); + REQUIRE(info.timestamp == 0x11223344u); + REQUIRE(info.ssrc == 0xDEADBEEFu); + REQUIRE(info.marker == true); +} + +TEST_CASE("RTP header parse: too short rejected", + "[latency_probe][rtp]") { + auto h = make_header(2, true, 1u, 2u); + lp::RtpHeaderInfo info; + REQUIRE_FALSE(lp::parse_rtp_header(h.data(), 11, info)); + REQUIRE_FALSE(lp::parse_rtp_header(nullptr, 0, info)); +} + +TEST_CASE("ClockOffset: returns 0/0 before any sample", + "[latency_probe][clock]") { + lp::ClockOffset c; + int64_t off = 999; + uint64_t rtt = 999; + c.get(off, rtt); + REQUIRE(off == 0); + REQUIRE(rtt == 0); +} + +TEST_CASE("ClockOffset: picks min-RTT sample", + "[latency_probe][clock]") { + lp::ClockOffset c; + + // Sample 1: drone is +1000us ahead, RTT=200us + // t1=1000, t2=1500 (drone), t3=1600 (drone), t4=1200 + // rtt = (t4-t1) - (t3-t2) = 200 - 100 = 100 (NOTE: small RTT here) + // offset = ((t2-t1) + (t3-t4))/2 = (500 + 400)/2 = 450 + c.add_sample(1000, 1500, 1600, 1200); + + // Sample 2: same offset, but with extra queueing in one direction. + // t1=2000, t2=2500 (true offset still ~500), t3=2600, t4=2800 + // rtt = 800 - 100 = 700 + // offset = (500 + -200)/2 = 150 (BIASED by queueing) + c.add_sample(2000, 2500, 2600, 2800); + + int64_t off = 0; uint64_t rtt = 0; + c.get(off, rtt); + REQUIRE(rtt == 100); // min-RTT picked + REQUIRE(off == 450); // its offset, not the biased one +} + +TEST_CASE("ClockOffset: rescans after ring eviction of current best", + "[latency_probe][clock]") { + lp::ClockOffset c; + + // First sample becomes best (RTT=0). + c.add_sample(0, 100, 110, 10); // rtt = 10-0 - 10 = 0 (use generous arith) + int64_t off; uint64_t rtt; + c.get(off, rtt); + auto best_rtt_before = rtt; + auto best_off_before = off; + + // Fill 15 more samples with much larger RTT so the first stays best. + for (int i = 0; i < 15; ++i) { + uint64_t base = 1000ull + i * 100ull; + // Big asymmetry -> big RTT. + c.add_sample(base, base + 5000, base + 5100, base + 10000); + } + c.get(off, rtt); + REQUIRE(rtt == best_rtt_before); + REQUIRE(off == best_off_before); + + // Add one more -- the original best gets evicted (ring=16, this is the 17th). + // The new best should be from the remaining 16 samples. + c.add_sample(20'000, 25'000, 25'100, 30'000); + c.get(off, rtt); + REQUIRE(rtt != best_rtt_before); // the original is gone +} + +TEST_CASE("FrameMatcher: arrival then sidecar publishes once", + "[latency_probe][matcher]") { + lp::FrameMatcher m; + std::vector published; + m.set_publish_callback([&](const lp::FrameTimings& f){ published.push_back(f); }); + + constexpr uint32_t ssrc = 0xABCDu; + constexpr uint32_t rtp_ts = 90000u; + + m.on_marker_arrival(ssrc, rtp_ts, /*gs_recv_us=*/100'000, /*now=*/100'000); + // Sidecar arrives second — should trigger publish. + m.on_msg_frame(ssrc, rtp_ts, + /*capture_us=*/ 50'000, + /*frame_ready_us=*/ 90'000, + /*last_pkt_send_us=*/95'000, + /*now=*/130'000); + + REQUIRE(published.size() == 1); + REQUIRE(published[0].ssrc == ssrc); + REQUIRE(published[0].rtp_ts == rtp_ts); + REQUIRE(published[0].gs_recv_last_us == 100'000); + REQUIRE(published[0].capture_us == 50'000); + REQUIRE(published[0].frame_ready_us == 90'000); + REQUIRE(published[0].last_pkt_send_us == 95'000); +} + +TEST_CASE("FrameMatcher: sidecar before arrival also works", + "[latency_probe][matcher]") { + lp::FrameMatcher m; + std::vector published; + m.set_publish_callback([&](const lp::FrameTimings& f){ published.push_back(f); }); + + m.on_msg_frame(1u, 100u, 50'000, 90'000, 95'000, /*now=*/96'000); + REQUIRE(published.empty()); + m.on_marker_arrival(1u, 100u, 100'000, 100'000); + REQUIRE(published.size() == 1); + REQUIRE(published[0].gs_recv_last_us == 100'000); +} + +TEST_CASE("FrameMatcher: TTL evicts orphans", + "[latency_probe][matcher]") { + lp::FrameMatcher m; + std::vector published; + m.set_publish_callback([&](const lp::FrameTimings& f){ published.push_back(f); }); + + // Arrival without ever getting a MSG_FRAME — orphan. + m.on_marker_arrival(1u, 100u, 1'000, /*now=*/1'000); + // Sweep with strict-> eviction: need (now - inserted) > ttl, so + // 1us past the boundary at 500'000us-since-insert is the minimum. + m.ttl_sweep(/*now=*/501'001, /*ttl_us=*/500'000); + REQUIRE(m.size() == 0); + + // Subsequent frame completes normally. + m.on_marker_arrival(1u, 200u, 600'000, 600'000); + m.on_msg_frame(1u, 200u, 0, 599'000, 599'500, 603'000); + + REQUIRE(published.size() == 1); + REQUIRE(published[0].rtp_ts == 200u); +} + +TEST_CASE("FrameMatcher: ring cap discards oldest", + "[latency_probe][matcher]") { + lp::FrameMatcher m; + std::vector published; + m.set_publish_callback([&](const lp::FrameTimings& f){ published.push_back(f); }); + + // Push 200 arrivals — cap should hold at 64 (kRingCap). + for (uint32_t i = 0; i < 200; ++i) { + m.on_marker_arrival(1u, i, i * 1000ull, i * 1000ull); + } + REQUIRE(m.size() == lp::FrameMatcher::kRingCap); +} + +TEST_CASE("FrameMatcher: duplicate marker arrival is a no-op stamp", + "[latency_probe][matcher]") { + lp::FrameMatcher m; + std::vector published; + m.set_publish_callback([&](const lp::FrameTimings& f){ published.push_back(f); }); + + m.on_marker_arrival(1u, 100u, 1'000, 1'000); + // Duplicate marker for the same frame must not overwrite the recorded time. + m.on_marker_arrival(1u, 100u, 2'000, 2'000); + + m.on_msg_frame(1u, 100u, 0, 900, 950, 5'000); + + REQUIRE(published.size() == 1); + REQUIRE(published[0].gs_recv_last_us == 1'000); +} + +TEST_CASE("compute_and_publish: normal values flow through", + "[latency_probe][publish]") { + lp::PublishedFacts captured; + auto cb = [&](const char* name, uint64_t v) { + captured.uint_facts.emplace_back(name, v); + }; + auto cbi = [&](const char* name, int64_t v) { + captured.int_facts.emplace_back(name, v); + }; + + lp::FrameTimings f{}; + f.ssrc = 1; f.rtp_ts = 100; + f.capture_us = 1'000; + f.frame_ready_us = 11'000; + f.last_pkt_send_us = 14'000; + f.gs_recv_last_us = 14'500; + f.sidecar_seen = true; + + int64_t offset_us = -1'000; + uint64_t rtt_us = 400; + uint64_t gs_pipeline_ms = 35; // matches video.decode_and_handover_ms + uint64_t clamp_counter = 0; + lp::compute_and_publish(f, offset_us, rtt_us, gs_pipeline_ms, + clamp_counter, cb, cbi); + + auto get = [&](const char* name) -> uint64_t { + for (auto& [n, v] : captured.uint_facts) if (n == name) return v; + FAIL("missing fact " << name); + return 0; + }; + + REQUIRE(get("video.latency.capture_to_encode_ms") == 10); + REQUIRE(get("video.latency.capture_to_encode_us") == 10'000); + REQUIRE(get("video.latency.encode_to_send_ms") == 3); + REQUIRE(get("video.latency.encode_to_send_us") == 3'000); + REQUIRE(get("video.latency.wire_ms") == 0); + REQUIRE(clamp_counter == 1); + // total_ms = capture(10) + encode(3) + wire(0) + gs_pipeline(35) = 48 + REQUIRE(get("video.latency.total_ms") == 48); + REQUIRE(get("video.latency.clock_rtt_us") == 400); + REQUIRE(get("video.latency.wire_clamp_count") == 1); + + // decode_ms and display_ms were removed in the refactor. + for (auto& [n, _] : captured.uint_facts) { + REQUIRE(n != "video.latency.decode_ms"); + REQUIRE(n != "video.latency.display_ms"); + } + + bool found_offset = false; + for (auto& [n, v] : captured.int_facts) { + if (n == "video.latency.clock_offset_us") { + REQUIRE(v == -1'000); + found_offset = true; + } + } + REQUIRE(found_offset); +} + +TEST_CASE("compute_and_publish: capture_us == 0 skips capture_to_encode but still publishes total", + "[latency_probe][publish]") { + lp::PublishedFacts captured; + auto cb = [&](const char* n, uint64_t v){ captured.uint_facts.emplace_back(n, v); }; + auto cbi = [&](const char* n, int64_t v){ captured.int_facts.emplace_back(n, v); }; + + lp::FrameTimings f{}; + f.capture_us = 0; + f.frame_ready_us = 11'000; + f.last_pkt_send_us = 14'000; + f.gs_recv_last_us = 15'000; + f.sidecar_seen = true; + + uint64_t clamp = 0; + // rtt_us=1 (any nonzero) signals "sync acquired" — the focus of this + // test is the capture_us=0 branch, not the pre-sync gate. + lp::compute_and_publish(f, 0, 1, /*gs_pipeline_ms=*/40, clamp, cb, cbi); + + for (auto& [n, _] : captured.uint_facts) { + REQUIRE(n != "video.latency.capture_to_encode_ms"); + REQUIRE(n != "video.latency.capture_to_encode_us"); + } + bool saw_wire = false, saw_send_us = false, saw_total = false; + uint64_t total = 0; + for (auto& [n, v] : captured.uint_facts) { + if (n == "video.latency.wire_ms") saw_wire = true; + if (n == "video.latency.encode_to_send_us") saw_send_us = true; + if (n == "video.latency.total_ms") { saw_total = true; total = v; } + } + REQUIRE(saw_wire); + REQUIRE(saw_send_us); + REQUIRE(saw_total); + // total = encode(3) + wire(1) + gs_pipeline(40) = 44 + REQUIRE(total == 44); +} + +TEST_CASE("compute_and_publish: pre-sync (rtt_us=0) suppresses wire_ms and total_ms", + "[latency_probe][publish]") { + // Regression guard for the cold-start race: before the first + // MSG_SYNC_RESP lands, ClockOffset::get() returns (offset=0, rtt=0). + // Without a valid offset, gs_recv_last_us (GS CLOCK_MONOTONIC) minus + // last_pkt_send_us (drone CLOCK_MONOTONIC) is (GS_uptime - drone_uptime) + // — unbounded, often huge. Field symptom: total_ms publishes absurd + // values until sync converges. Guard against republishing this bug. + lp::PublishedFacts captured; + auto cb = [&](const char* n, uint64_t v){ captured.uint_facts.emplace_back(n, v); }; + auto cbi = [&](const char* n, int64_t v){ captured.int_facts.emplace_back(n, v); }; + + lp::FrameTimings f{}; + f.ssrc = 1; f.rtp_ts = 100; + f.capture_us = 1'000; // drone clock + f.frame_ready_us = 11'000; // drone clock + f.last_pkt_send_us = 14'000; // drone clock + // GS clock is millions of µs away from drone clock — the exact divergence + // we'd see if both machines have been up for different durations. + f.gs_recv_last_us = 9'999'999'000ull; + f.sidecar_seen = true; + + uint64_t clamp = 0; + lp::compute_and_publish(f, /*offset_us=*/0, /*rtt_us=*/0, + /*gs_pipeline_ms=*/35, clamp, cb, cbi); + + auto has_uint = [&](const char* name) { + for (auto& [n, _] : captured.uint_facts) if (n == name) return true; + return false; + }; + + // Drone-local segments are safe to publish without sync. + REQUIRE(has_uint("video.latency.capture_to_encode_ms")); + REQUIRE(has_uint("video.latency.capture_to_encode_us")); + REQUIRE(has_uint("video.latency.encode_to_send_ms")); + REQUIRE(has_uint("video.latency.encode_to_send_us")); + // Clock diagnostics still publish — seeing rtt=0 is itself signal. + REQUIRE(has_uint("video.latency.clock_rtt_us")); + REQUIRE(has_uint("video.latency.wire_clamp_count")); + + // wire_ms and total_ms MUST be suppressed pre-sync. + for (auto& [n, _] : captured.uint_facts) { + REQUIRE(n != "video.latency.wire_ms"); + REQUIRE(n != "video.latency.total_ms"); + } + // wire_clamp_counter must not tick on bogus pre-sync deltas. + REQUIRE(clamp == 0); +} + +TEST_CASE("wire: encode_subscribe round-trip header bytes", + "[latency_probe][wire]") { + uint8_t buf[lp::wire::kSizeSubscribe] = {}; + lp::encode_subscribe(buf); + REQUIRE(buf[0] == 0x52); + REQUIRE(buf[1] == 0x54); + REQUIRE(buf[2] == 0x50); + REQUIRE(buf[3] == 0x53); + REQUIRE(buf[4] == 1); // version + REQUIRE(buf[5] == 1); // msg_type = SUBSCRIBE +} + +TEST_CASE("wire: encode_sync_req carries t1 big-endian", + "[latency_probe][wire]") { + uint8_t buf[lp::wire::kSizeSyncReq] = {}; + lp::encode_sync_req(buf, 0x0102030405060708ull); + REQUIRE(buf[5] == 3); // SYNC_REQ + REQUIRE(buf[8] == 0x01); + REQUIRE(buf[9] == 0x02); + REQUIRE(buf[10] == 0x03); + REQUIRE(buf[11] == 0x04); + REQUIRE(buf[12] == 0x05); + REQUIRE(buf[13] == 0x06); + REQUIRE(buf[14] == 0x07); + REQUIRE(buf[15] == 0x08); +} + +TEST_CASE("wire: decode_message rejects bad magic / version / length", + "[latency_probe][wire]") { + uint8_t buf[64] = {}; + lp::SyncRespFields sr{}; + lp::MsgFrameFields mf{}; + REQUIRE(lp::decode_message(buf, 0, sr, mf) == 0); + buf[0]=0; buf[1]=0; buf[2]=0; buf[3]=0; buf[4]=1; buf[5]=4; + REQUIRE(lp::decode_message(buf, lp::wire::kSizeSyncResp, sr, mf) == 0); + buf[0]=0x52; buf[1]=0x54; buf[2]=0x50; buf[3]=0x53; buf[4]=99; buf[5]=4; + REQUIRE(lp::decode_message(buf, lp::wire::kSizeSyncResp, sr, mf) == 0); + // (d) null pointer + REQUIRE(lp::decode_message(nullptr, 32, sr, mf) == 0); + // (e) unknown msg_type + buf[0]=0x52; buf[1]=0x54; buf[2]=0x50; buf[3]=0x53; buf[4]=1; buf[5]=0x99; + REQUIRE(lp::decode_message(buf, sizeof(buf), sr, mf) == 0); + // (f) sub-message too short — valid magic+version+kMsgSyncResp but len < kSizeSyncResp + buf[0]=0x52; buf[1]=0x54; buf[2]=0x50; buf[3]=0x53; buf[4]=1; buf[5]=lp::wire::kMsgSyncResp; + REQUIRE(lp::decode_message(buf, lp::wire::kSizeSyncResp - 1, sr, mf) == 0); +} + +TEST_CASE("wire: decode_message reads SYNC_RESP fields big-endian", + "[latency_probe][wire]") { + uint8_t buf[lp::wire::kSizeSyncResp] = {}; + buf[0]=0x52; buf[1]=0x54; buf[2]=0x50; buf[3]=0x53; + buf[4]=1; buf[5]=lp::wire::kMsgSyncResp; + uint64_t t1 = 0xAABBCCDDEEFF0011ull; + for (int i = 0; i < 8; ++i) buf[8+i] = (t1 >> (56 - 8*i)) & 0xff; + buf[23] = 100; + buf[31] = 200; + + lp::SyncRespFields sr{}; + lp::MsgFrameFields mf{}; + REQUIRE(lp::decode_message(buf, sizeof(buf), sr, mf) == lp::wire::kMsgSyncResp); + REQUIRE(sr.t1_us == 0xAABBCCDDEEFF0011ull); + REQUIRE(sr.t2_us == 100); + REQUIRE(sr.t3_us == 200); +} + +TEST_CASE("wire: decode_message reads MSG_FRAME fields", + "[latency_probe][wire]") { + uint8_t buf[lp::wire::kSizeFrame] = {}; + buf[0]=0x52; buf[1]=0x54; buf[2]=0x50; buf[3]=0x53; + buf[4]=1; buf[5]=lp::wire::kMsgFrame; + uint32_t ssrc = 0xDEADBEEFu; + for (int i=0;i<4;++i) buf[8+i] = (ssrc >> (24-8*i)) & 0xff; + uint32_t rtp_ts = 0x11223344u; + for (int i=0;i<4;++i) buf[12+i] = (rtp_ts >> (24-8*i)) & 0xff; + auto write_be64 = [&](size_t off, uint64_t v) { + for (int i = 0; i < 8; ++i) buf[off + i] = (v >> (56 - 8 * i)) & 0xff; + }; + write_be64(24, 0xAABBCCDDEEFF1122ull); // frame_ready_us + write_be64(36, 0x0102030405060708ull); // capture_us + write_be64(44, 0xFEDCBA9876543210ull); // last_pkt_send_us + + lp::SyncRespFields sr{}; + lp::MsgFrameFields mf{}; + REQUIRE(lp::decode_message(buf, sizeof(buf), sr, mf) == lp::wire::kMsgFrame); + REQUIRE(mf.ssrc == ssrc); + REQUIRE(mf.rtp_timestamp == rtp_ts); + REQUIRE(mf.frame_ready_us == 0xAABBCCDDEEFF1122ull); + REQUIRE(mf.capture_us == 0x0102030405060708ull); + REQUIRE(mf.last_pkt_send_us == 0xFEDCBA9876543210ull); +} diff --git a/tests/test_latency_probe_integration.cpp b/tests/test_latency_probe_integration.cpp new file mode 100644 index 00000000..4670775d --- /dev/null +++ b/tests/test_latency_probe_integration.cpp @@ -0,0 +1,220 @@ +#include + +#include "../src/latency_probe.hpp" +#include "../src/latency_probe_wire.hpp" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace lp = latency_probe; + +namespace { + +struct FakeWaybeam { + int fd = -1; + uint16_t port = 0; + std::atomic stop{false}; + std::thread th; + std::atomic subscribe_count{0}; + std::atomic sync_responded{0}; + + struct FrameSpec { + uint32_t ssrc; + uint32_t rtp_ts; + uint64_t capture_us; + uint64_t frame_ready_us; + uint64_t last_pkt_send_us; + }; + std::vector frames; + std::atomic frames_emitted{0}; + + bool start_listening() { + fd = socket(AF_INET, SOCK_DGRAM, 0); + if (fd < 0) return false; + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_port = 0; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + if (bind(fd, (sockaddr*)&addr, sizeof(addr)) != 0) { close(fd); fd=-1; return false; } + socklen_t alen = sizeof(addr); + getsockname(fd, (sockaddr*)&addr, &alen); + port = ntohs(addr.sin_port); + return true; + } + + void run() { + sockaddr_in peer{}; + socklen_t plen = sizeof(peer); + bool have_peer = false; + + while (!stop.load()) { + uint8_t buf[128]; + ssize_t n = recvfrom(fd, buf, sizeof(buf), MSG_DONTWAIT, + (sockaddr*)&peer, &plen); + if (n > 0) { + have_peer = true; + if (n >= (ssize_t)lp::wire::kSizeSubscribe && + buf[5] == lp::wire::kMsgSubscribe) { + subscribe_count++; + } + else if (n >= (ssize_t)lp::wire::kSizeSyncReq && + buf[5] == lp::wire::kMsgSyncReq) { + uint8_t out[lp::wire::kSizeSyncResp] = {}; + out[0]=0x52;out[1]=0x54;out[2]=0x50;out[3]=0x53; + out[4]=1; out[5]=lp::wire::kMsgSyncResp; + memcpy(out+8, buf+8, 8); + uint64_t t2 = 10'000'000ull, t3 = 10'000'100ull; + for (int i=0;i<8;++i){ out[16+i]=(t2>>(56-8*i))&0xff; } + for (int i=0;i<8;++i){ out[24+i]=(t3>>(56-8*i))&0xff; } + sendto(fd, out, sizeof(out), 0, (sockaddr*)&peer, plen); + sync_responded++; + } + } + + if (have_peer && frames_emitted < frames.size()) { + const auto& f = frames[frames_emitted++]; + uint8_t out[lp::wire::kSizeFrame] = {}; + out[0]=0x52;out[1]=0x54;out[2]=0x50;out[3]=0x53; + out[4]=1; out[5]=lp::wire::kMsgFrame; + auto wbe32=[&](size_t off, uint32_t v){ + out[off]=(v>>24)&0xff; out[off+1]=(v>>16)&0xff; + out[off+2]=(v>>8)&0xff; out[off+3]=v&0xff; + }; + auto wbe64=[&](size_t off, uint64_t v){ + for (int i=0;i<8;++i) out[off+i]=(v>>(56-8*i))&0xff; + }; + wbe32(8, f.ssrc); + wbe32(12, f.rtp_ts); + wbe64(24, f.frame_ready_us); + wbe64(36, f.capture_us); + wbe64(44, f.last_pkt_send_us); + sendto(fd, out, sizeof(out), 0, (sockaddr*)&peer, plen); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + } +}; + +} // namespace + +TEST_CASE("latency_probe: integration loopback publishes facts", + "[latency_probe][integration]") { + FakeWaybeam fw; + REQUIRE(fw.start_listening()); + fw.frames = { + {1u, 100u, 1'000, 11'000, 14'000}, + {1u, 200u, 2'000, 12'000, 15'000}, + {1u, 300u, 3'000, 13'000, 16'000}, + }; + fw.th = std::thread([&]{ fw.run(); }); + + std::mutex captured_mu; + std::vector> uint_facts; + std::vector> int_facts; + lp::set_publish_overrides_for_test( + [&](const char* n, uint64_t v){ + std::lock_guard lk(captured_mu); uint_facts.emplace_back(n,v); + }, + [&](const char* n, int64_t v){ + std::lock_guard lk(captured_mu); int_facts.emplace_back(n,v); + }); + + REQUIRE(lp::start("127.0.0.1", fw.port)); + + // Wait for the first clock sync to land. compute_and_publish suppresses + // wire_ms / total_ms until the offset is valid (rtt_us > 0), so without + // this gate the first frames would publish only drone-local segments and + // the wire_ms / total_ms assertions below would race. The probe sends + // MSG_SYNC_REQ at startup; fake waybeam replies within a few ms on loopback. + { + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(2); + while (fw.sync_responded.load() == 0 && + std::chrono::steady_clock::now() < deadline) { + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + } + REQUIRE(fw.sync_responded.load() >= 1); + // Give the probe thread a beat to handle_packet the sync response. + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + + // Feed a synthetic GS-pipeline reading; total_ms uses this. + lp::record_gs_pipeline_ms(35); + + for (uint32_t i = 0; i < 3; ++i) { + uint32_t rtp_ts = 100u * (i + 1); + uint64_t base = lp::now_us(); + // null/empty input is a no-op (parser rejects). + lp::on_rtp_buffer(nullptr, 0, base); + // Synthetic 12-byte RTP header for the parser path. + uint8_t hdr[12] = {}; + hdr[0] = 0x80; // V=2 + hdr[1] = 0x80; // marker=1 + hdr[4]=(rtp_ts>>24)&0xff; hdr[5]=(rtp_ts>>16)&0xff; + hdr[6]=(rtp_ts>>8)&0xff; hdr[7]= rtp_ts &0xff; + hdr[8]=0; hdr[9]=0; hdr[10]=0; hdr[11]=1; // ssrc=1 + lp::on_rtp_buffer(hdr, sizeof(hdr), base); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(3); + while (std::chrono::steady_clock::now() < deadline) { + std::lock_guard lk(captured_mu); + size_t totals = 0; + for (auto& [n, _] : uint_facts) if (n == "video.latency.total_ms") totals++; + if (totals >= 3) break; + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + lp::stop(); + lp::set_publish_overrides_for_test(nullptr, nullptr); + fw.stop = true; + if (fw.th.joinable()) fw.th.join(); + close(fw.fd); + + REQUIRE(fw.subscribe_count.load() >= 1); + REQUIRE(fw.sync_responded.load() >= 1); + + std::lock_guard lk(captured_mu); + size_t totals = 0; + for (auto& [n, _] : uint_facts) if (n == "video.latency.total_ms") totals++; + REQUIRE(totals >= 3); + + bool saw_offset = false; + for (auto& [n, v] : int_facts) { + if (n == "video.latency.clock_offset_us") { saw_offset = true; break; } + } + REQUIRE(saw_offset); + + // Verify the spec-required facts are present. Names match + // compute_and_publish's emit list after the refactor (decode_ms and + // display_ms were removed; total now sums via gs_pipeline_ms). + auto has_uint = [&](const char* name) { + for (auto& [n, _] : uint_facts) if (n == name) return true; + return false; + }; + REQUIRE(has_uint("video.latency.capture_to_encode_ms")); + REQUIRE(has_uint("video.latency.capture_to_encode_us")); + REQUIRE(has_uint("video.latency.encode_to_send_ms")); + REQUIRE(has_uint("video.latency.encode_to_send_us")); + REQUIRE(has_uint("video.latency.wire_ms")); + REQUIRE(has_uint("video.latency.total_ms")); + REQUIRE(has_uint("video.latency.clock_rtt_us")); + REQUIRE(has_uint("video.latency.wire_clamp_count")); + // clock_offset_us is signed (int_facts) — already covered by saw_offset above. + // decode_ms / display_ms were removed in the refactor; verify they're gone. + for (auto& [n, _] : uint_facts) { + REQUIRE(n != "video.latency.decode_ms"); + REQUIRE(n != "video.latency.display_ms"); + } +}