From 9a40776ddfa977ccd3ee1adbd6793e8ed05b8837 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Thu, 7 May 2026 19:41:27 -0400 Subject: [PATCH 1/2] Fix CLIENT-mode runlevel/sync and rec ring buffer wrap * getRunLevel() falls back to the SYSINFO mirror in shmem so CLIENT sessions report the device runlevel even without a fresh SYSREP packet flowing through their receive ring (the STANDALONE owner only sees SYSREPs during its handshake). * sync() CLIENT path uses getRunLevel() for the no-op runlevel set (previously sent runlevel=0 from the stale atomic) and waits specifically on cbPKTTYPE_SYSREPRUNLEV (0x12) via a sticky received_sysrepRunlev flag, so periodic 0x10 SYSREP heartbeats from nPlayServer can no longer falsely satisfy the wait. * writeToReceiveBuffer pads end-of-buffer wrap gaps with a synthetic chid=0/type=0/dlen!=0 marker so CLIENT readers can advance through the gap cleanly. Previously the reader had no way to detect the gap and would land inside it after the first wrap, decoding garbage as fake SYSREP packets and emitting spurious runlevel-change events. Adds release/acquire ordering on head_index for ARM weak-memory correctness. * pycbsdk regression tests in test_client_mode.py covering CLIENT runlevel, sync correctness, and packet integrity through a forced rec-buffer wrap. * simple_device.cpp: optional duration argument and prints standalone/protocol/proc-ident/runlevel after session creation. Co-Authored-By: Claude Opus 4.7 (1M context) --- examples/SimpleDevice/simple_device.cpp | 59 +++++++- pycbsdk/tests/test_client_mode.py | 170 ++++++++++++++++++++++++ src/cbsdk/include/cbsdk/sdk_session.h | 14 +- src/cbsdk/src/sdk_session.cpp | 69 +++++++--- src/cbshm/src/shmem_session.cpp | 84 ++++++++++-- 5 files changed, 360 insertions(+), 36 deletions(-) diff --git a/examples/SimpleDevice/simple_device.cpp b/examples/SimpleDevice/simple_device.cpp index 6f39a647..4a8b00c7 100644 --- a/examples/SimpleDevice/simple_device.cpp +++ b/examples/SimpleDevice/simple_device.cpp @@ -92,6 +92,23 @@ DeviceType parseDeviceType(const std::string& type_str) { std::exit(1); } +/// Map runlevel value to name +const char* getRunLevelName(uint32_t runlevel) { + switch (runlevel) { + case 0: return "UNKNOWN"; + case cbRUNLEVEL_STARTUP: return "STARTUP"; + case cbRUNLEVEL_HARDRESET: return "HARDRESET"; + case cbRUNLEVEL_STANDBY: return "STANDBY"; + case cbRUNLEVEL_RESET: return "RESET"; + case cbRUNLEVEL_RUNNING: return "RUNNING"; + case cbRUNLEVEL_STRESSED: return "STRESSED"; + case cbRUNLEVEL_ERROR: return "ERROR"; + case cbRUNLEVEL_SHUTDOWN: return "SHUTDOWN"; + case cbRUNLEVEL_UPDATE: return "UPDATE"; + default: return "?"; + } +} + /// Get device type name string const char* getDeviceTypeName(DeviceType type) { switch (type) { @@ -111,14 +128,23 @@ int main(int argc, char* argv[]) { std::cout << "================================================\n\n"; // Parse command line arguments + // simple_device [device_type] [duration_seconds] + // duration_seconds <= 0 means "run until Ctrl+C" (default). DeviceType device_type = DeviceType::LEGACY_NSP; // Default to NSP + int duration_seconds = 0; if (argc >= 2) { device_type = parseDeviceType(argv[1]); } + if (argc >= 3) { + duration_seconds = std::atoi(argv[2]); + } std::cout << "Configuration:\n"; std::cout << " Device Type: " << getDeviceTypeName(device_type) << "\n"; + if (duration_seconds > 0) { + std::cout << " Duration: " << duration_seconds << "s (auto-stop)\n"; + } // Register signal handler for clean shutdown signal(SIGINT, signalHandler); @@ -167,11 +193,30 @@ int main(int argc, char* argv[]) { std::cout << "Packet callback registered.\n\n"; + // Log runlevel transitions reported by the device. + session.registerRunlevelChangeCallback([](uint32_t rl) { + std::cout << "\n[runlevel change] -> " << rl + << " (" << getRunLevelName(rl) << ")\n"; + }); + // Step 4: Session is already running (auto-started by SDK) - std::cout << "SDK session is running and receiving packets...\n\n"; + std::cout << "SDK session is running and receiving packets...\n"; + std::cout << " Standalone: " << (session.isStandalone() ? "yes" : "no (CLIENT)") << "\n"; + std::cout << " Proto ver: 0x" << std::hex << session.getProtocolVersion() << std::dec << "\n"; + std::cout << " Proc ident: " << session.getProcIdent() << "\n"; + { + uint32_t rl = session.getRunLevel(); + std::cout << " Runlevel: " << rl << " (" << getRunLevelName(rl) << ")\n"; + } + std::cout << "\n"; // Step 5: Run for specified duration, showing statistics - std::cout << "Receiving packets... (Press Ctrl+C to stop)\n\n"; + if (duration_seconds > 0) { + std::cout << "Receiving packets for " << duration_seconds + << "s (or Ctrl+C to stop early)...\n\n"; + } else { + std::cout << "Receiving packets... (Press Ctrl+C to stop)\n\n"; + } std::cout << "Statistics (updated every second):\n"; std::cout << "-----------------------------------\n"; @@ -186,9 +231,17 @@ int main(int argc, char* argv[]) { auto now = std::chrono::steady_clock::now(); seconds_elapsed = std::chrono::duration_cast(now - start_time).count(); + if (duration_seconds > 0 && seconds_elapsed >= duration_seconds) { + g_running = false; + } + + uint32_t runlevel = session.getRunLevel(); + // Print statistics std::cout << "\r[" << std::setw(3) << seconds_elapsed << "s] " - << "Total: " << std::setw(8) << packet_count.load() + << "RL: " << std::setw(2) << runlevel << " (" << std::setw(9) << std::left + << getRunLevelName(runlevel) << std::right << ")" + << " | Total: " << std::setw(8) << packet_count.load() << " | Config: " << std::setw(6) << config_count.load() << " | Spikes: " << std::setw(8) << spike_count.load() << " | RX: " << std::setw(10) << stats.packets_received_from_device diff --git a/pycbsdk/tests/test_client_mode.py b/pycbsdk/tests/test_client_mode.py index ca9d4d9c..b1fe575b 100644 --- a/pycbsdk/tests/test_client_mode.py +++ b/pycbsdk/tests/test_client_mode.py @@ -20,6 +20,7 @@ SampleRate, Session, ) +from pycbsdk.session import RUNLEVEL_RUNNING pytestmark = pytest.mark.integration @@ -286,3 +287,172 @@ def test_device_time_nonzero(self, client_session): """CLIENT should read device time from shmem.""" t = client_session.time assert t > 0 + + +# --------------------------------------------------------------------------- +# Runlevel and sync in CLIENT mode +# --------------------------------------------------------------------------- + + +class TestClientRunlevelAndSync: + """Verify CLIENT-mode runlevel reporting and sync(). + + Regression coverage for two bugs: + + 1. CLIENT-mode ``getRunLevel()`` previously returned 0. The atomic it + reads only updates when a SYSREP packet flows through the CLIENT's + receive ring, but devices only emit SYSREP in response to runlevel + commands — so after the STANDALONE owner finished its handshake the + CLIENT's atomic stayed at 0 indefinitely. Fix: ``getRunLevel()`` + falls back to the SYSINFO mirror in shmem (which the STANDALONE + writes via ``setSysInfo`` on every SYSREP). + + 2. CLIENT-mode ``sync()`` previously waited on any 0x10..0x1F SYSREP, + so periodic SYSREP (0x10) heartbeats from nPlayServer could falsely + satisfy the wait before the actual SYSREPRUNLEV (0x12) reply + arrived. ``sync()`` could also send SYSSETRUNLEV with + ``runlevel=0`` because it read the per-session atomic instead of + ``getRunLevel()``. Fix: ``sync()`` uses ``getRunLevel()`` and + waits on a sticky ``received_sysrepRunlev`` flag set only on + type 0x12. + """ + + def test_runlevel_is_running(self, client_session): + """CLIENT must report RUNLEVEL_RUNNING (50), not 0.""" + assert client_session.runlevel == RUNLEVEL_RUNNING + + def test_runlevel_matches_standalone(self, nplay_session, client_session): + """STANDALONE and CLIENT must agree on the runlevel.""" + assert client_session.runlevel == nplay_session.runlevel + + def test_sync_does_not_timeout(self, client_session): + """sync() in CLIENT mode must complete within the timeout.""" + client_session.sync(timeout=2.0) + + def test_sync_after_setter_observes_new_state(self, client_session): + """Round-trip a CLIENT-issued setter through sync() and read back. + + If sync() were satisfied by a heartbeat (the pre-fix behavior) we + could observe stale state on the read-back, since the actual + SYSREPRUNLEV reply might still be in flight along with the + device-applied CHANSET acknowledgments. This test exercises the + full happy path: configure → sync → read. + """ + client_session.set_sample_group( + N_CHANS, ChannelType.FRONTEND, SampleRate.SR_1kHz, + disable_others=True, + ) + client_session.sync(timeout=2.0) + # The synced state must be visible. + ids = client_session.get_matching_channel_ids( + ChannelType.FRONTEND, n_chans=N_CHANS, + ) + smpgroups = client_session.get_channels_field( + ChannelType.FRONTEND, ChanInfoField.SMPGROUP, N_CHANS, + ) + assert len(ids) == N_CHANS + assert all(g == int(SampleRate.SR_1kHz) for g in smpgroups), smpgroups + # Restore for any later tests sharing the nplay_session. + client_session.set_sample_group( + N_CHANS, ChannelType.FRONTEND, SampleRate.SR_30kHz, + disable_others=True, + ) + client_session.sync(timeout=2.0) + + def test_runlevel_stable_during_steady_state(self, client_session): + """Runlevel must remain RUNNING across reads and not flicker. + + Before the wrap-marker fix in the rec ring buffer, the CLIENT + would observe garbage SYSREP-family packets after the first + buffer wrap, firing spurious runlevel-change events. This test + is a lightweight check that runlevel is stable; the deeper wrap + regression test is :class:`TestClientBufferWrapRegression`. + """ + for _ in range(5): + assert client_session.runlevel == RUNLEVEL_RUNNING + time.sleep(0.05) + + +# --------------------------------------------------------------------------- +# Buffer-wrap regression test +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def client_session_full_rate(nplay_session): + """CLIENT session with all FRONTEND channels enabled at 30 kHz. + + Used to drive enough data through the rec ring buffer to force at + least one wrap during the test. Each group packet is ~528 bytes + (256 ch × int16 + header), so at 30 kHz that's ~15.84 MB/s; the + rec buffer is ~268 MB and wraps in ~17 s. + """ + n_fe = nplay_session.num_fe_chans() + nplay_session.set_sample_group( + n_fe, ChannelType.FRONTEND, SampleRate.SR_30kHz, + disable_others=False, + ) + nplay_session.sync(timeout=2.0) + + with Session(DeviceType.NPLAY) as client: + time.sleep(1) + yield client + + # Restore the small-channel default for sibling tests. + nplay_session.set_sample_group( + N_CHANS, ChannelType.FRONTEND, SampleRate.SR_30kHz, + disable_others=True, + ) + nplay_session.sync(timeout=2.0) + + +class TestClientBufferWrapRegression: + """Verify CLIENT data integrity across a rec ring buffer wrap. + + Regression test for the wrap-marker fix in + ``cbshm/src/shmem_session.cpp``. Before the fix, when the writer + wrapped the rec ring buffer it left a 1+ dword gap that the reader + had no way to detect — the CLIENT would read garbage as fake + packets after the first wrap, manifesting as out-of-range + ``chid`` / ``type`` values and spurious runlevel-change events. + + The test runs long enough (~20 s) to guarantee at least one wrap + at the full ~15.84 MB/s rate, then asserts that no malformed + packets reached the user callback and that the runlevel is still + correct. + """ + + def test_no_malformed_packets_through_wrap(self, client_session_full_rate): + """No malformed packets must reach a user catch-all callback.""" + # Cache outside the hot callback path. + max_chans = client_session_full_rate.max_chans() + bad = [] + good_count = [0] + + @client_session_full_rate.on_packet() + def on_pkt(header, data): + good_count[0] += 1 + # Real packets have either chid == 0 (group sample) or chid in + # 1..max_chans (events / chaninfo) or chid == 0x8000 + # (configuration channel). The high byte of `type` is always 0 + # in the current protocol; a non-zero high byte indicates the + # bytes were misinterpreted as a header. + if (header.type & 0xFF00) != 0: + bad.append(("type-high-bits", int(header.type), int(header.chid))) + elif header.chid != 0 and header.chid != 0x8000: + if header.chid > max_chans: + bad.append(("chid-out-of-range", int(header.chid), + int(header.type))) + + # Run long enough to force at least one wrap of the ~268 MB rec + # buffer at ~15.84 MB/s. 20 s gives a comfortable margin. + time.sleep(20) + + assert good_count[0] > 0, "CLIENT received no packets" + assert not bad, ( + f"CLIENT received {len(bad)} malformed packet(s) after a buffer " + f"wrap (first 5: {bad[:5]}). This usually means the rec ring " + f"buffer wrap-marker handling regressed." + ) + # Runlevel must still be sane after going through wraps. + assert client_session_full_rate.runlevel == RUNLEVEL_RUNNING diff --git a/src/cbsdk/include/cbsdk/sdk_session.h b/src/cbsdk/include/cbsdk/sdk_session.h index 3f518a00..089df375 100644 --- a/src/cbsdk/include/cbsdk/sdk_session.h +++ b/src/cbsdk/include/cbsdk/sdk_session.h @@ -842,19 +842,23 @@ class SdkSession { /// Wait for SYSREP packet (helper for handshaking) /// @param timeout_ms Timeout in milliseconds - /// @param expected_runlevel Expected runlevel (0 = any SYSREP) - /// @return true if SYSREP received with expected runlevel, false if timeout - bool waitForSysrep(uint32_t timeout_ms, uint32_t expected_runlevel = 0) const; + /// @param expected_runlevel Expected runlevel (0 = any runlevel) + /// @param expected_type Expected SYSREP packet type (0 = any 0x10..0x1F) + /// @return true if matching SYSREP received, false if timeout + bool waitForSysrep(uint32_t timeout_ms, uint32_t expected_runlevel = 0, + uint16_t expected_type = 0) const; /// Send a runlevel command packet to the device (internal version with wait_for_runlevel) /// @param runlevel Desired runlevel (cbRUNLEVEL_*) /// @param resetque Channel for reset to queue on /// @param runflags Lock recording after reset - /// @param wait_for_runlevel Runlevel to wait for (0 = any SYSREP) + /// @param wait_for_runlevel Runlevel to wait for (0 = any runlevel) /// @param timeout_ms Timeout in milliseconds + /// @param expected_type Expected SYSREP type to wait for (0 = any 0x10..0x1F) /// @return Result indicating success or error Result setSystemRunLevel(uint32_t runlevel, uint32_t resetque, uint32_t runflags, - uint32_t wait_for_runlevel, uint32_t timeout_ms); + uint32_t wait_for_runlevel, uint32_t timeout_ms, + uint16_t expected_type = 0); /// Request configuration with custom timeout (internal version) /// @param timeout_ms Timeout in milliseconds diff --git a/src/cbsdk/src/sdk_session.cpp b/src/cbsdk/src/sdk_session.cpp index 2a4d2cf8..2e48e3ff 100644 --- a/src/cbsdk/src/sdk_session.cpp +++ b/src/cbsdk/src/sdk_session.cpp @@ -233,6 +233,11 @@ struct SdkSession::Impl { // Handshake state (for performStartupHandshake) std::atomic device_runlevel{0}; std::atomic received_sysrep{false}; + // Sticky flag set when a SYSREPRUNLEV (0x12) is observed since the last + // reset. Stays true even if a later SYSREP (0x10) heartbeat arrives, so + // sync()'s wait can't be raced by a heartbeat clobbering a "last type" + // field. Reset alongside received_sysrep before each send. + std::atomic received_sysrepRunlev{false}; std::mutex handshake_mutex; std::condition_variable handshake_cv; @@ -892,6 +897,9 @@ Result SdkSession::start() { if ((pkt.cbpkt_header.type & 0xF0) == cbPKTTYPE_SYSREP) { const auto* sysinfo = reinterpret_cast(&pkt); impl->updateRunlevel(sysinfo->runlevel); + if (pkt.cbpkt_header.type == cbPKTTYPE_SYSREPRUNLEV) { + impl->received_sysrepRunlev.store(true, std::memory_order_release); + } impl->received_sysrep.store(true, std::memory_order_release); impl->handshake_cv.notify_all(); } @@ -1203,6 +1211,9 @@ Result SdkSession::start() { if ((packets[i].cbpkt_header.type & 0xF0) == cbPKTTYPE_SYSREP) { const auto* sysinfo = reinterpret_cast(&packets[i]); impl->updateRunlevel(sysinfo->runlevel); + if (packets[i].cbpkt_header.type == cbPKTTYPE_SYSREPRUNLEV) { + impl->received_sysrepRunlev.store(true, std::memory_order_release); + } impl->received_sysrep.store(true, std::memory_order_release); impl->handshake_cv.notify_all(); } @@ -1486,7 +1497,13 @@ const cbPKT_FILTINFO* SdkSession::getFilterInfo(const uint32_t filter_id) const } uint32_t SdkSession::getRunLevel() const { - return m_impl->device_runlevel.load(std::memory_order_acquire); + uint32_t rl = m_impl->device_runlevel.load(std::memory_order_acquire); + if (rl != 0) return rl; + // Fall back to the SYSINFO mirrored into shmem by the STANDALONE owner. + // In CLIENT mode the device only emits SYSREP on runlevel-set commands, + // so this session's receive ring may never see one in steady state. + if (const auto* si = getSysInfo()) return si->runlevel; + return 0; } bool SdkSession::isStandalone() const { @@ -2235,20 +2252,26 @@ Result SdkSession::sync(uint32_t timeout_ms) { // Send a no-op runlevel SET (current runlevel) as a sync barrier. // The device processes packets in order, so the resulting SYSREPRUNLEV // confirms all prior configuration packets have been applied. - uint32_t current = m_impl->device_runlevel.load(std::memory_order_acquire); + // + // Use getRunLevel() (not the raw atomic): in CLIENT mode the SYSREP + // ring may be empty (the STANDALONE owner already handshook before we + // attached), so the atomic stays at 0. getRunLevel() falls back to the + // SYSINFO mirror in shmem. Sending runlevel=0 here would risk + // perturbing device state, so it must be the real current runlevel. + uint32_t current = getRunLevel(); Result result = Result::error("No session available"); if (m_impl->device_session) { // STANDALONE: use DeviceSession::setSystemRunLevelSync which matches // the specific SYSREPRUNLEV (type 0x12) response via sendAndWait. - // The old boolean waitForSysrep path matched any SYSREP (0x10-0x1F) - // and was falsely triggered by periodic SYSREP (0x10) heartbeats - // from nPlayServer. result = m_impl->device_session->setSystemRunLevelSync( current, 0, 0, std::chrono::milliseconds(timeout_ms)); } else { - // CLIENT mode: route through shmem (uses the boolean waitForSysrep path) - result = setSystemRunLevel(current, 0, 0, 0, timeout_ms); + // CLIENT: route through shmem and wait for SYSREPRUNLEV (0x12) + // specifically — periodic SYSREP heartbeats (0x10) from + // nPlayServer would otherwise falsely satisfy the wait. + result = setSystemRunLevel(current, 0, 0, 0, timeout_ms, + cbPKTTYPE_SYSREPRUNLEV); } if (result.isError()) return result; @@ -2256,7 +2279,7 @@ Result SdkSession::sync(uint32_t timeout_ms) { // The device also sends SYSREP on network error / reset. If the // returned runlevel is HARDRESET or STANDBY the device dropped our // config packets and restarted. - uint32_t rl = m_impl->device_runlevel.load(std::memory_order_acquire); + uint32_t rl = getRunLevel(); if (rl <= cbRUNLEVEL_STANDBY) { return Result::error( "Device reset during configuration (runlevel " + std::to_string(rl) + ")"); @@ -2401,20 +2424,25 @@ Result SdkSession::sendPacket(const cbPKT_GENERIC& pkt) { /// Helper: Wait for SYSREP packet ///-------------------------------------------------------------------------------------------- -bool SdkSession::waitForSysrep(uint32_t timeout_ms, uint32_t expected_runlevel) const { - // Wait for SYSREP packet with optional expected runlevel - // If expected_runlevel is 0, accept any SYSREP - // If expected_runlevel is non-zero, wait for that specific runlevel +bool SdkSession::waitForSysrep(uint32_t timeout_ms, uint32_t expected_runlevel, + uint16_t expected_type) const { + // Wait for SYSREP packet, optionally filtered by type and/or runlevel. + // expected_type == 0: any 0x10..0x1F packet + // expected_type == cbPKTTYPE_SYSREPRUNLEV (0x12): + // requires the sticky 0x12 flag + // expected_runlevel == 0: accept any runlevel std::unique_lock lock(m_impl->handshake_mutex); return m_impl->handshake_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), - [this, expected_runlevel] { - if (!m_impl->received_sysrep.load(std::memory_order_acquire)) { - return false; // Haven't received SYSREP yet + [this, expected_runlevel, expected_type] { + if (expected_type == cbPKTTYPE_SYSREPRUNLEV) { + if (!m_impl->received_sysrepRunlev.load(std::memory_order_acquire)) + return false; + } else if (!m_impl->received_sysrep.load(std::memory_order_acquire)) { + return false; } if (expected_runlevel == 0) { - return true; // Accept any SYSREP + return true; } - // Check if runlevel matches return m_impl->device_runlevel.load(std::memory_order_acquire) == expected_runlevel; }); } @@ -2428,9 +2456,11 @@ Result SdkSession::setSystemRunLevel(uint32_t runlevel, uint32_t resetque, } Result SdkSession::setSystemRunLevel(uint32_t runlevel, uint32_t resetque, uint32_t runflags, - uint32_t wait_for_runlevel, uint32_t timeout_ms) { + uint32_t wait_for_runlevel, uint32_t timeout_ms, + uint16_t expected_type) { // Reset handshake state before sending m_impl->received_sysrep.store(false, std::memory_order_relaxed); + m_impl->received_sysrepRunlev.store(false, std::memory_order_relaxed); // Send the runlevel packet Result send_result = Result::error("No session available"); @@ -2452,7 +2482,7 @@ Result SdkSession::setSystemRunLevel(uint32_t runlevel, uint32_t resetque, return send_result; // Wait for SYSREP response - if (!waitForSysrep(timeout_ms, wait_for_runlevel)) { + if (!waitForSysrep(timeout_ms, wait_for_runlevel, expected_type)) { if (wait_for_runlevel != 0) return Result::error("No SYSREP response with expected runlevel " + std::to_string(wait_for_runlevel)); return Result::error("No SYSREP response received for setSystemRunLevel"); @@ -2500,6 +2530,7 @@ Result SdkSession::performStartupHandshake(uint32_t timeout_ms) { // Reset handshake state m_impl->received_sysrep.store(false, std::memory_order_relaxed); + m_impl->received_sysrepRunlev.store(false, std::memory_order_relaxed); m_impl->device_runlevel.store(0, std::memory_order_relaxed); // Quick presence check - use shorter timeout to fail fast for non-existent devices diff --git a/src/cbshm/src/shmem_session.cpp b/src/cbshm/src/shmem_session.cpp index 4c1ceaae..a6af3ccb 100644 --- a/src/cbshm/src/shmem_session.cpp +++ b/src/cbshm/src/shmem_session.cpp @@ -299,7 +299,23 @@ struct ShmemSession::Impl { is_open = false; } - /// @brief Write a packet to the receive buffer ring + /// @brief Write a packet to the receive buffer ring. + /// + /// Cross-process publication: the consumer (CLIENT) reads head_index and + /// then reads packet bytes up to that index. On weak memory ordering + /// architectures (e.g. ARM/Apple Silicon) the head_index update must be + /// release-ordered with respect to the preceding memcpy and wrap update, + /// otherwise the consumer can observe an advanced head_index but stale + /// or partial bytes — which manifests as misaligned packets. + /// + /// Wrap padding: when a packet would not fit at @c head, the writer wraps + /// to offset 0. The skipped bytes between the previous packet and + /// @c buflen would otherwise leave the consumer's tail stranded inside + /// random gap data after the wrap. We pad the gap with a synthetic "wrap + /// marker" packet (chid=0, type=0, non-zero dlen) so the consumer can + /// advance tail through it cleanly and drop into the wrap. Wrap policy + /// also ensures the gap left after each write is either 0 or large + /// enough (>= cbPKT_HEADER_32SIZE) to fit a marker. Result writeToReceiveBuffer(const cbPKT_GENERIC& pkt) { if (!rec_buffer_raw) { return Result::error("Receive buffer not initialized"); @@ -312,20 +328,50 @@ struct ShmemSession::Impl { } uint32_t head = recHeadindex(); + uint32_t* buf = recBuffer(); - if (head + pkt_size_words > rec_buffer_len) { + // Decide whether to wrap. Wrap if either (a) the packet would not + // fit, or (b) writing it would leave a 1..3 dword tail gap that we + // cannot mark with a wrap-marker header on the next wrap. + const uint32_t end_after = head + pkt_size_words; + bool need_wrap = false; + if (end_after > rec_buffer_len) { + need_wrap = true; + } else if (end_after < rec_buffer_len && + (rec_buffer_len - end_after) < cbPKT_HEADER_32SIZE) { + need_wrap = true; + } + + if (need_wrap) { + // Pad the gap [head, buflen) with a wrap marker so the consumer + // can step over it. By the wrap-policy invariant above, gap is + // either 0 or >= cbPKT_HEADER_32SIZE. + uint32_t gap_dwords = rec_buffer_len - head; + if (gap_dwords >= cbPKT_HEADER_32SIZE) { + cbPKT_HEADER marker{}; + marker.time = 0; + marker.chid = 0; + marker.type = 0; + marker.dlen = static_cast(gap_dwords - cbPKT_HEADER_32SIZE); + marker.instrument = 0; + marker.reserved = 0; + std::memcpy(&buf[head], &marker, sizeof(cbPKT_HEADER)); + } head = 0; - recHeadwrap()++; + __atomic_store_n(&recHeadwrap(), recHeadwrap() + 1, __ATOMIC_RELAXED); } - uint32_t* buf = recBuffer(); const uint32_t* pkt_data = reinterpret_cast(&pkt); std::memcpy(&buf[head], pkt_data, pkt_size_words * sizeof(uint32_t)); - recHeadindex() = head + pkt_size_words; recReceived()++; recLasttime() = pkt.cbpkt_header.time; + // Release fence: head_index store synchronizes-with the consumer's + // acquire load, ensuring all prior writes (marker, memcpy, wrap, + // lasttime) are visible before the consumer sees the new head_index. + __atomic_store_n(&recHeadindex(), head + pkt_size_words, __ATOMIC_RELEASE); + return Result::ok(); } @@ -505,9 +551,11 @@ struct ShmemSession::Impl { // In CLIENT mode, sync our read position to the current head so we only // read NEW packets, not stale data that was already in the ring buffer. + // Use acquire load on head_index to pair with the producer's release + // store (see writeToReceiveBuffer). if (mode == Mode::CLIENT) { - rec_tailindex = recHeadindex(); - rec_tailwrap = recHeadwrap(); + rec_tailindex = __atomic_load_n(&recHeadindex(), __ATOMIC_ACQUIRE); + rec_tailwrap = __atomic_load_n(&recHeadwrap(), __ATOMIC_RELAXED); } // Detect protocol version for CENTRAL_COMPAT mode @@ -1843,8 +1891,12 @@ Result ShmemSession::readReceiveBuffer(cbPKT_GENERIC* packets, size_t max_ uint32_t* buf = m_impl->recBuffer(); uint32_t buflen = m_impl->rec_buffer_len; - uint32_t head_index = m_impl->recHeadindex(); - uint32_t head_wrap = m_impl->recHeadwrap(); + // Acquire-load: pairs with the producer's release-store of head_index in + // writeToReceiveBuffer. Without this, on weak memory architectures + // (ARM/Apple Silicon) we can observe an advanced head_index but stale or + // partial packet bytes, leading to misaligned reads of the ring buffer. + uint32_t head_index = __atomic_load_n(&m_impl->recHeadindex(), __ATOMIC_ACQUIRE); + uint32_t head_wrap = __atomic_load_n(&m_impl->recHeadwrap(), __ATOMIC_RELAXED); if (m_impl->rec_tailwrap == head_wrap && m_impl->rec_tailindex == head_index) { return Result::ok(); @@ -1905,6 +1957,20 @@ Result ShmemSession::readReceiveBuffer(cbPKT_GENERIC* packets, size_t max_ raw_header_32size = cbPKT_HEADER_32SIZE; // 4 auto* hdr = reinterpret_cast(&buf[m_impl->rec_tailindex]); raw_dlen = hdr->dlen; + + // Wrap-marker packet inserted by the writer to fill the unused + // gap before a wrap-around (chid=0, type=0, dlen != 0). Skip + // silently — advance tail past the marker without reporting it + // to the caller so it never reaches user callbacks. + if (hdr->chid == 0 && hdr->type == 0 && hdr->dlen != 0) { + uint32_t marker_size = cbPKT_HEADER_32SIZE + hdr->dlen; + m_impl->rec_tailindex += marker_size; + if (m_impl->rec_tailindex >= buflen) { + m_impl->rec_tailindex -= buflen; + m_impl->rec_tailwrap++; + } + continue; + } } uint32_t pkt_size_dwords = raw_header_32size + raw_dlen; From 5319b7482c0aef4096436acb30eea4c5ad182636 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Thu, 7 May 2026 20:56:59 -0400 Subject: [PATCH 2/2] Fix MSVC build: portable shmem ring-buffer atomic helpers The release/acquire ordering on rec ring buffer head_index/head_wrap used GCC's __atomic_* builtins, which MSVC doesn't expose. Wrap the four call sites in small inline helpers that branch between __atomic_* on GCC/Clang and std::atomic_thread_fence + volatile load/store on MSVC. Pre-existing __atomic_* uses in the xmt buffer were already #ifdef _WIN32-guarded with InterlockedExchange, so this just brings the rec-buffer additions to parity. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/cbshm/src/shmem_session.cpp | 59 +++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 6 deletions(-) diff --git a/src/cbshm/src/shmem_session.cpp b/src/cbshm/src/shmem_session.cpp index a6af3ccb..ea26eb5d 100644 --- a/src/cbshm/src/shmem_session.cpp +++ b/src/cbshm/src/shmem_session.cpp @@ -29,11 +29,58 @@ #include #include #include +#include #include #include // std::gcd namespace cbshm { +namespace { + +// Cross-process publication helpers for the shmem ring buffer head/tail +// fields. The underlying memory lives in a shared mmap'd region (not in a +// std::atomic<>), so we use compiler-specific load/store + memory_order +// semantics. GCC/Clang have __atomic_*; MSVC needs the Interlocked +// intrinsics, with a thread fence to provide acquire ordering on ARM64 +// (on x86/x64 aligned loads/stores already imply acquire/release). +inline void shm_store_release_u32(uint32_t* p, uint32_t v) { +#if defined(__GNUC__) || defined(__clang__) + __atomic_store_n(p, v, __ATOMIC_RELEASE); +#else + // MSVC: ensure prior writes complete before publishing v. + std::atomic_thread_fence(std::memory_order_release); + *reinterpret_cast(p) = v; +#endif +} + +inline void shm_store_relaxed_u32(uint32_t* p, uint32_t v) { +#if defined(__GNUC__) || defined(__clang__) + __atomic_store_n(p, v, __ATOMIC_RELAXED); +#else + *reinterpret_cast(p) = v; +#endif +} + +inline uint32_t shm_load_acquire_u32(const uint32_t* p) { +#if defined(__GNUC__) || defined(__clang__) + return __atomic_load_n(p, __ATOMIC_ACQUIRE); +#else + uint32_t v = *reinterpret_cast(p); + std::atomic_thread_fence(std::memory_order_acquire); + return v; +#endif +} + +inline uint32_t shm_load_relaxed_u32(const uint32_t* p) { +#if defined(__GNUC__) || defined(__clang__) + return __atomic_load_n(p, __ATOMIC_RELAXED); +#else + return *reinterpret_cast(p); +#endif +} + +} // namespace + /////////////////////////////////////////////////////////////////////////////////////////////////// /// @brief Platform-specific implementation details (Pimpl idiom) /// @@ -358,7 +405,7 @@ struct ShmemSession::Impl { std::memcpy(&buf[head], &marker, sizeof(cbPKT_HEADER)); } head = 0; - __atomic_store_n(&recHeadwrap(), recHeadwrap() + 1, __ATOMIC_RELAXED); + shm_store_relaxed_u32(&recHeadwrap(), recHeadwrap() + 1); } const uint32_t* pkt_data = reinterpret_cast(&pkt); @@ -370,7 +417,7 @@ struct ShmemSession::Impl { // Release fence: head_index store synchronizes-with the consumer's // acquire load, ensuring all prior writes (marker, memcpy, wrap, // lasttime) are visible before the consumer sees the new head_index. - __atomic_store_n(&recHeadindex(), head + pkt_size_words, __ATOMIC_RELEASE); + shm_store_release_u32(&recHeadindex(), head + pkt_size_words); return Result::ok(); } @@ -554,8 +601,8 @@ struct ShmemSession::Impl { // Use acquire load on head_index to pair with the producer's release // store (see writeToReceiveBuffer). if (mode == Mode::CLIENT) { - rec_tailindex = __atomic_load_n(&recHeadindex(), __ATOMIC_ACQUIRE); - rec_tailwrap = __atomic_load_n(&recHeadwrap(), __ATOMIC_RELAXED); + rec_tailindex = shm_load_acquire_u32(&recHeadindex()); + rec_tailwrap = shm_load_relaxed_u32(&recHeadwrap()); } // Detect protocol version for CENTRAL_COMPAT mode @@ -1895,8 +1942,8 @@ Result ShmemSession::readReceiveBuffer(cbPKT_GENERIC* packets, size_t max_ // writeToReceiveBuffer. Without this, on weak memory architectures // (ARM/Apple Silicon) we can observe an advanced head_index but stale or // partial packet bytes, leading to misaligned reads of the ring buffer. - uint32_t head_index = __atomic_load_n(&m_impl->recHeadindex(), __ATOMIC_ACQUIRE); - uint32_t head_wrap = __atomic_load_n(&m_impl->recHeadwrap(), __ATOMIC_RELAXED); + uint32_t head_index = shm_load_acquire_u32(&m_impl->recHeadindex()); + uint32_t head_wrap = shm_load_relaxed_u32(&m_impl->recHeadwrap()); if (m_impl->rec_tailwrap == head_wrap && m_impl->rec_tailindex == head_index) { return Result::ok();