diff --git a/examples/SimpleDevice/simple_device.cpp b/examples/SimpleDevice/simple_device.cpp index 6f39a647..4a8b00c7 100644 --- a/examples/SimpleDevice/simple_device.cpp +++ b/examples/SimpleDevice/simple_device.cpp @@ -92,6 +92,23 @@ DeviceType parseDeviceType(const std::string& type_str) { std::exit(1); } +/// Map runlevel value to name +const char* getRunLevelName(uint32_t runlevel) { + switch (runlevel) { + case 0: return "UNKNOWN"; + case cbRUNLEVEL_STARTUP: return "STARTUP"; + case cbRUNLEVEL_HARDRESET: return "HARDRESET"; + case cbRUNLEVEL_STANDBY: return "STANDBY"; + case cbRUNLEVEL_RESET: return "RESET"; + case cbRUNLEVEL_RUNNING: return "RUNNING"; + case cbRUNLEVEL_STRESSED: return "STRESSED"; + case cbRUNLEVEL_ERROR: return "ERROR"; + case cbRUNLEVEL_SHUTDOWN: return "SHUTDOWN"; + case cbRUNLEVEL_UPDATE: return "UPDATE"; + default: return "?"; + } +} + /// Get device type name string const char* getDeviceTypeName(DeviceType type) { switch (type) { @@ -111,14 +128,23 @@ int main(int argc, char* argv[]) { std::cout << "================================================\n\n"; // Parse command line arguments + // simple_device [device_type] [duration_seconds] + // duration_seconds <= 0 means "run until Ctrl+C" (default). DeviceType device_type = DeviceType::LEGACY_NSP; // Default to NSP + int duration_seconds = 0; if (argc >= 2) { device_type = parseDeviceType(argv[1]); } + if (argc >= 3) { + duration_seconds = std::atoi(argv[2]); + } std::cout << "Configuration:\n"; std::cout << " Device Type: " << getDeviceTypeName(device_type) << "\n"; + if (duration_seconds > 0) { + std::cout << " Duration: " << duration_seconds << "s (auto-stop)\n"; + } // Register signal handler for clean shutdown signal(SIGINT, signalHandler); @@ -167,11 +193,30 @@ int main(int argc, char* argv[]) { std::cout << "Packet callback registered.\n\n"; + // Log runlevel transitions reported by the device. + session.registerRunlevelChangeCallback([](uint32_t rl) { + std::cout << "\n[runlevel change] -> " << rl + << " (" << getRunLevelName(rl) << ")\n"; + }); + // Step 4: Session is already running (auto-started by SDK) - std::cout << "SDK session is running and receiving packets...\n\n"; + std::cout << "SDK session is running and receiving packets...\n"; + std::cout << " Standalone: " << (session.isStandalone() ? "yes" : "no (CLIENT)") << "\n"; + std::cout << " Proto ver: 0x" << std::hex << session.getProtocolVersion() << std::dec << "\n"; + std::cout << " Proc ident: " << session.getProcIdent() << "\n"; + { + uint32_t rl = session.getRunLevel(); + std::cout << " Runlevel: " << rl << " (" << getRunLevelName(rl) << ")\n"; + } + std::cout << "\n"; // Step 5: Run for specified duration, showing statistics - std::cout << "Receiving packets... (Press Ctrl+C to stop)\n\n"; + if (duration_seconds > 0) { + std::cout << "Receiving packets for " << duration_seconds + << "s (or Ctrl+C to stop early)...\n\n"; + } else { + std::cout << "Receiving packets... (Press Ctrl+C to stop)\n\n"; + } std::cout << "Statistics (updated every second):\n"; std::cout << "-----------------------------------\n"; @@ -186,9 +231,17 @@ int main(int argc, char* argv[]) { auto now = std::chrono::steady_clock::now(); seconds_elapsed = std::chrono::duration_cast(now - start_time).count(); + if (duration_seconds > 0 && seconds_elapsed >= duration_seconds) { + g_running = false; + } + + uint32_t runlevel = session.getRunLevel(); + // Print statistics std::cout << "\r[" << std::setw(3) << seconds_elapsed << "s] " - << "Total: " << std::setw(8) << packet_count.load() + << "RL: " << std::setw(2) << runlevel << " (" << std::setw(9) << std::left + << getRunLevelName(runlevel) << std::right << ")" + << " | Total: " << std::setw(8) << packet_count.load() << " | Config: " << std::setw(6) << config_count.load() << " | Spikes: " << std::setw(8) << spike_count.load() << " | RX: " << std::setw(10) << stats.packets_received_from_device diff --git a/pycbsdk/tests/test_client_mode.py b/pycbsdk/tests/test_client_mode.py index ca9d4d9c..b1fe575b 100644 --- a/pycbsdk/tests/test_client_mode.py +++ b/pycbsdk/tests/test_client_mode.py @@ -20,6 +20,7 @@ SampleRate, Session, ) +from pycbsdk.session import RUNLEVEL_RUNNING pytestmark = pytest.mark.integration @@ -286,3 +287,172 @@ def test_device_time_nonzero(self, client_session): """CLIENT should read device time from shmem.""" t = client_session.time assert t > 0 + + +# --------------------------------------------------------------------------- +# Runlevel and sync in CLIENT mode +# --------------------------------------------------------------------------- + + +class TestClientRunlevelAndSync: + """Verify CLIENT-mode runlevel reporting and sync(). + + Regression coverage for two bugs: + + 1. CLIENT-mode ``getRunLevel()`` previously returned 0. The atomic it + reads only updates when a SYSREP packet flows through the CLIENT's + receive ring, but devices only emit SYSREP in response to runlevel + commands — so after the STANDALONE owner finished its handshake the + CLIENT's atomic stayed at 0 indefinitely. Fix: ``getRunLevel()`` + falls back to the SYSINFO mirror in shmem (which the STANDALONE + writes via ``setSysInfo`` on every SYSREP). + + 2. CLIENT-mode ``sync()`` previously waited on any 0x10..0x1F SYSREP, + so periodic SYSREP (0x10) heartbeats from nPlayServer could falsely + satisfy the wait before the actual SYSREPRUNLEV (0x12) reply + arrived. ``sync()`` could also send SYSSETRUNLEV with + ``runlevel=0`` because it read the per-session atomic instead of + ``getRunLevel()``. Fix: ``sync()`` uses ``getRunLevel()`` and + waits on a sticky ``received_sysrepRunlev`` flag set only on + type 0x12. + """ + + def test_runlevel_is_running(self, client_session): + """CLIENT must report RUNLEVEL_RUNNING (50), not 0.""" + assert client_session.runlevel == RUNLEVEL_RUNNING + + def test_runlevel_matches_standalone(self, nplay_session, client_session): + """STANDALONE and CLIENT must agree on the runlevel.""" + assert client_session.runlevel == nplay_session.runlevel + + def test_sync_does_not_timeout(self, client_session): + """sync() in CLIENT mode must complete within the timeout.""" + client_session.sync(timeout=2.0) + + def test_sync_after_setter_observes_new_state(self, client_session): + """Round-trip a CLIENT-issued setter through sync() and read back. + + If sync() were satisfied by a heartbeat (the pre-fix behavior) we + could observe stale state on the read-back, since the actual + SYSREPRUNLEV reply might still be in flight along with the + device-applied CHANSET acknowledgments. This test exercises the + full happy path: configure → sync → read. + """ + client_session.set_sample_group( + N_CHANS, ChannelType.FRONTEND, SampleRate.SR_1kHz, + disable_others=True, + ) + client_session.sync(timeout=2.0) + # The synced state must be visible. + ids = client_session.get_matching_channel_ids( + ChannelType.FRONTEND, n_chans=N_CHANS, + ) + smpgroups = client_session.get_channels_field( + ChannelType.FRONTEND, ChanInfoField.SMPGROUP, N_CHANS, + ) + assert len(ids) == N_CHANS + assert all(g == int(SampleRate.SR_1kHz) for g in smpgroups), smpgroups + # Restore for any later tests sharing the nplay_session. + client_session.set_sample_group( + N_CHANS, ChannelType.FRONTEND, SampleRate.SR_30kHz, + disable_others=True, + ) + client_session.sync(timeout=2.0) + + def test_runlevel_stable_during_steady_state(self, client_session): + """Runlevel must remain RUNNING across reads and not flicker. + + Before the wrap-marker fix in the rec ring buffer, the CLIENT + would observe garbage SYSREP-family packets after the first + buffer wrap, firing spurious runlevel-change events. This test + is a lightweight check that runlevel is stable; the deeper wrap + regression test is :class:`TestClientBufferWrapRegression`. + """ + for _ in range(5): + assert client_session.runlevel == RUNLEVEL_RUNNING + time.sleep(0.05) + + +# --------------------------------------------------------------------------- +# Buffer-wrap regression test +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def client_session_full_rate(nplay_session): + """CLIENT session with all FRONTEND channels enabled at 30 kHz. + + Used to drive enough data through the rec ring buffer to force at + least one wrap during the test. Each group packet is ~528 bytes + (256 ch × int16 + header), so at 30 kHz that's ~15.84 MB/s; the + rec buffer is ~268 MB and wraps in ~17 s. + """ + n_fe = nplay_session.num_fe_chans() + nplay_session.set_sample_group( + n_fe, ChannelType.FRONTEND, SampleRate.SR_30kHz, + disable_others=False, + ) + nplay_session.sync(timeout=2.0) + + with Session(DeviceType.NPLAY) as client: + time.sleep(1) + yield client + + # Restore the small-channel default for sibling tests. + nplay_session.set_sample_group( + N_CHANS, ChannelType.FRONTEND, SampleRate.SR_30kHz, + disable_others=True, + ) + nplay_session.sync(timeout=2.0) + + +class TestClientBufferWrapRegression: + """Verify CLIENT data integrity across a rec ring buffer wrap. + + Regression test for the wrap-marker fix in + ``cbshm/src/shmem_session.cpp``. Before the fix, when the writer + wrapped the rec ring buffer it left a 1+ dword gap that the reader + had no way to detect — the CLIENT would read garbage as fake + packets after the first wrap, manifesting as out-of-range + ``chid`` / ``type`` values and spurious runlevel-change events. + + The test runs long enough (~20 s) to guarantee at least one wrap + at the full ~15.84 MB/s rate, then asserts that no malformed + packets reached the user callback and that the runlevel is still + correct. + """ + + def test_no_malformed_packets_through_wrap(self, client_session_full_rate): + """No malformed packets must reach a user catch-all callback.""" + # Cache outside the hot callback path. + max_chans = client_session_full_rate.max_chans() + bad = [] + good_count = [0] + + @client_session_full_rate.on_packet() + def on_pkt(header, data): + good_count[0] += 1 + # Real packets have either chid == 0 (group sample) or chid in + # 1..max_chans (events / chaninfo) or chid == 0x8000 + # (configuration channel). The high byte of `type` is always 0 + # in the current protocol; a non-zero high byte indicates the + # bytes were misinterpreted as a header. + if (header.type & 0xFF00) != 0: + bad.append(("type-high-bits", int(header.type), int(header.chid))) + elif header.chid != 0 and header.chid != 0x8000: + if header.chid > max_chans: + bad.append(("chid-out-of-range", int(header.chid), + int(header.type))) + + # Run long enough to force at least one wrap of the ~268 MB rec + # buffer at ~15.84 MB/s. 20 s gives a comfortable margin. + time.sleep(20) + + assert good_count[0] > 0, "CLIENT received no packets" + assert not bad, ( + f"CLIENT received {len(bad)} malformed packet(s) after a buffer " + f"wrap (first 5: {bad[:5]}). This usually means the rec ring " + f"buffer wrap-marker handling regressed." + ) + # Runlevel must still be sane after going through wraps. + assert client_session_full_rate.runlevel == RUNLEVEL_RUNNING diff --git a/src/cbsdk/include/cbsdk/sdk_session.h b/src/cbsdk/include/cbsdk/sdk_session.h index 3f518a00..089df375 100644 --- a/src/cbsdk/include/cbsdk/sdk_session.h +++ b/src/cbsdk/include/cbsdk/sdk_session.h @@ -842,19 +842,23 @@ class SdkSession { /// Wait for SYSREP packet (helper for handshaking) /// @param timeout_ms Timeout in milliseconds - /// @param expected_runlevel Expected runlevel (0 = any SYSREP) - /// @return true if SYSREP received with expected runlevel, false if timeout - bool waitForSysrep(uint32_t timeout_ms, uint32_t expected_runlevel = 0) const; + /// @param expected_runlevel Expected runlevel (0 = any runlevel) + /// @param expected_type Expected SYSREP packet type (0 = any 0x10..0x1F) + /// @return true if matching SYSREP received, false if timeout + bool waitForSysrep(uint32_t timeout_ms, uint32_t expected_runlevel = 0, + uint16_t expected_type = 0) const; /// Send a runlevel command packet to the device (internal version with wait_for_runlevel) /// @param runlevel Desired runlevel (cbRUNLEVEL_*) /// @param resetque Channel for reset to queue on /// @param runflags Lock recording after reset - /// @param wait_for_runlevel Runlevel to wait for (0 = any SYSREP) + /// @param wait_for_runlevel Runlevel to wait for (0 = any runlevel) /// @param timeout_ms Timeout in milliseconds + /// @param expected_type Expected SYSREP type to wait for (0 = any 0x10..0x1F) /// @return Result indicating success or error Result setSystemRunLevel(uint32_t runlevel, uint32_t resetque, uint32_t runflags, - uint32_t wait_for_runlevel, uint32_t timeout_ms); + uint32_t wait_for_runlevel, uint32_t timeout_ms, + uint16_t expected_type = 0); /// Request configuration with custom timeout (internal version) /// @param timeout_ms Timeout in milliseconds diff --git a/src/cbsdk/src/sdk_session.cpp b/src/cbsdk/src/sdk_session.cpp index 2a4d2cf8..2e48e3ff 100644 --- a/src/cbsdk/src/sdk_session.cpp +++ b/src/cbsdk/src/sdk_session.cpp @@ -233,6 +233,11 @@ struct SdkSession::Impl { // Handshake state (for performStartupHandshake) std::atomic device_runlevel{0}; std::atomic received_sysrep{false}; + // Sticky flag set when a SYSREPRUNLEV (0x12) is observed since the last + // reset. Stays true even if a later SYSREP (0x10) heartbeat arrives, so + // sync()'s wait can't be raced by a heartbeat clobbering a "last type" + // field. Reset alongside received_sysrep before each send. + std::atomic received_sysrepRunlev{false}; std::mutex handshake_mutex; std::condition_variable handshake_cv; @@ -892,6 +897,9 @@ Result SdkSession::start() { if ((pkt.cbpkt_header.type & 0xF0) == cbPKTTYPE_SYSREP) { const auto* sysinfo = reinterpret_cast(&pkt); impl->updateRunlevel(sysinfo->runlevel); + if (pkt.cbpkt_header.type == cbPKTTYPE_SYSREPRUNLEV) { + impl->received_sysrepRunlev.store(true, std::memory_order_release); + } impl->received_sysrep.store(true, std::memory_order_release); impl->handshake_cv.notify_all(); } @@ -1203,6 +1211,9 @@ Result SdkSession::start() { if ((packets[i].cbpkt_header.type & 0xF0) == cbPKTTYPE_SYSREP) { const auto* sysinfo = reinterpret_cast(&packets[i]); impl->updateRunlevel(sysinfo->runlevel); + if (packets[i].cbpkt_header.type == cbPKTTYPE_SYSREPRUNLEV) { + impl->received_sysrepRunlev.store(true, std::memory_order_release); + } impl->received_sysrep.store(true, std::memory_order_release); impl->handshake_cv.notify_all(); } @@ -1486,7 +1497,13 @@ const cbPKT_FILTINFO* SdkSession::getFilterInfo(const uint32_t filter_id) const } uint32_t SdkSession::getRunLevel() const { - return m_impl->device_runlevel.load(std::memory_order_acquire); + uint32_t rl = m_impl->device_runlevel.load(std::memory_order_acquire); + if (rl != 0) return rl; + // Fall back to the SYSINFO mirrored into shmem by the STANDALONE owner. + // In CLIENT mode the device only emits SYSREP on runlevel-set commands, + // so this session's receive ring may never see one in steady state. + if (const auto* si = getSysInfo()) return si->runlevel; + return 0; } bool SdkSession::isStandalone() const { @@ -2235,20 +2252,26 @@ Result SdkSession::sync(uint32_t timeout_ms) { // Send a no-op runlevel SET (current runlevel) as a sync barrier. // The device processes packets in order, so the resulting SYSREPRUNLEV // confirms all prior configuration packets have been applied. - uint32_t current = m_impl->device_runlevel.load(std::memory_order_acquire); + // + // Use getRunLevel() (not the raw atomic): in CLIENT mode the SYSREP + // ring may be empty (the STANDALONE owner already handshook before we + // attached), so the atomic stays at 0. getRunLevel() falls back to the + // SYSINFO mirror in shmem. Sending runlevel=0 here would risk + // perturbing device state, so it must be the real current runlevel. + uint32_t current = getRunLevel(); Result result = Result::error("No session available"); if (m_impl->device_session) { // STANDALONE: use DeviceSession::setSystemRunLevelSync which matches // the specific SYSREPRUNLEV (type 0x12) response via sendAndWait. - // The old boolean waitForSysrep path matched any SYSREP (0x10-0x1F) - // and was falsely triggered by periodic SYSREP (0x10) heartbeats - // from nPlayServer. result = m_impl->device_session->setSystemRunLevelSync( current, 0, 0, std::chrono::milliseconds(timeout_ms)); } else { - // CLIENT mode: route through shmem (uses the boolean waitForSysrep path) - result = setSystemRunLevel(current, 0, 0, 0, timeout_ms); + // CLIENT: route through shmem and wait for SYSREPRUNLEV (0x12) + // specifically — periodic SYSREP heartbeats (0x10) from + // nPlayServer would otherwise falsely satisfy the wait. + result = setSystemRunLevel(current, 0, 0, 0, timeout_ms, + cbPKTTYPE_SYSREPRUNLEV); } if (result.isError()) return result; @@ -2256,7 +2279,7 @@ Result SdkSession::sync(uint32_t timeout_ms) { // The device also sends SYSREP on network error / reset. If the // returned runlevel is HARDRESET or STANDBY the device dropped our // config packets and restarted. - uint32_t rl = m_impl->device_runlevel.load(std::memory_order_acquire); + uint32_t rl = getRunLevel(); if (rl <= cbRUNLEVEL_STANDBY) { return Result::error( "Device reset during configuration (runlevel " + std::to_string(rl) + ")"); @@ -2401,20 +2424,25 @@ Result SdkSession::sendPacket(const cbPKT_GENERIC& pkt) { /// Helper: Wait for SYSREP packet ///-------------------------------------------------------------------------------------------- -bool SdkSession::waitForSysrep(uint32_t timeout_ms, uint32_t expected_runlevel) const { - // Wait for SYSREP packet with optional expected runlevel - // If expected_runlevel is 0, accept any SYSREP - // If expected_runlevel is non-zero, wait for that specific runlevel +bool SdkSession::waitForSysrep(uint32_t timeout_ms, uint32_t expected_runlevel, + uint16_t expected_type) const { + // Wait for SYSREP packet, optionally filtered by type and/or runlevel. + // expected_type == 0: any 0x10..0x1F packet + // expected_type == cbPKTTYPE_SYSREPRUNLEV (0x12): + // requires the sticky 0x12 flag + // expected_runlevel == 0: accept any runlevel std::unique_lock lock(m_impl->handshake_mutex); return m_impl->handshake_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), - [this, expected_runlevel] { - if (!m_impl->received_sysrep.load(std::memory_order_acquire)) { - return false; // Haven't received SYSREP yet + [this, expected_runlevel, expected_type] { + if (expected_type == cbPKTTYPE_SYSREPRUNLEV) { + if (!m_impl->received_sysrepRunlev.load(std::memory_order_acquire)) + return false; + } else if (!m_impl->received_sysrep.load(std::memory_order_acquire)) { + return false; } if (expected_runlevel == 0) { - return true; // Accept any SYSREP + return true; } - // Check if runlevel matches return m_impl->device_runlevel.load(std::memory_order_acquire) == expected_runlevel; }); } @@ -2428,9 +2456,11 @@ Result SdkSession::setSystemRunLevel(uint32_t runlevel, uint32_t resetque, } Result SdkSession::setSystemRunLevel(uint32_t runlevel, uint32_t resetque, uint32_t runflags, - uint32_t wait_for_runlevel, uint32_t timeout_ms) { + uint32_t wait_for_runlevel, uint32_t timeout_ms, + uint16_t expected_type) { // Reset handshake state before sending m_impl->received_sysrep.store(false, std::memory_order_relaxed); + m_impl->received_sysrepRunlev.store(false, std::memory_order_relaxed); // Send the runlevel packet Result send_result = Result::error("No session available"); @@ -2452,7 +2482,7 @@ Result SdkSession::setSystemRunLevel(uint32_t runlevel, uint32_t resetque, return send_result; // Wait for SYSREP response - if (!waitForSysrep(timeout_ms, wait_for_runlevel)) { + if (!waitForSysrep(timeout_ms, wait_for_runlevel, expected_type)) { if (wait_for_runlevel != 0) return Result::error("No SYSREP response with expected runlevel " + std::to_string(wait_for_runlevel)); return Result::error("No SYSREP response received for setSystemRunLevel"); @@ -2500,6 +2530,7 @@ Result SdkSession::performStartupHandshake(uint32_t timeout_ms) { // Reset handshake state m_impl->received_sysrep.store(false, std::memory_order_relaxed); + m_impl->received_sysrepRunlev.store(false, std::memory_order_relaxed); m_impl->device_runlevel.store(0, std::memory_order_relaxed); // Quick presence check - use shorter timeout to fail fast for non-existent devices diff --git a/src/cbshm/src/shmem_session.cpp b/src/cbshm/src/shmem_session.cpp index 4c1ceaae..ea26eb5d 100644 --- a/src/cbshm/src/shmem_session.cpp +++ b/src/cbshm/src/shmem_session.cpp @@ -29,11 +29,58 @@ #include #include #include +#include #include #include // std::gcd namespace cbshm { +namespace { + +// Cross-process publication helpers for the shmem ring buffer head/tail +// fields. The underlying memory lives in a shared mmap'd region (not in a +// std::atomic<>), so we use compiler-specific load/store + memory_order +// semantics. GCC/Clang have __atomic_*; MSVC needs the Interlocked +// intrinsics, with a thread fence to provide acquire ordering on ARM64 +// (on x86/x64 aligned loads/stores already imply acquire/release). +inline void shm_store_release_u32(uint32_t* p, uint32_t v) { +#if defined(__GNUC__) || defined(__clang__) + __atomic_store_n(p, v, __ATOMIC_RELEASE); +#else + // MSVC: ensure prior writes complete before publishing v. + std::atomic_thread_fence(std::memory_order_release); + *reinterpret_cast(p) = v; +#endif +} + +inline void shm_store_relaxed_u32(uint32_t* p, uint32_t v) { +#if defined(__GNUC__) || defined(__clang__) + __atomic_store_n(p, v, __ATOMIC_RELAXED); +#else + *reinterpret_cast(p) = v; +#endif +} + +inline uint32_t shm_load_acquire_u32(const uint32_t* p) { +#if defined(__GNUC__) || defined(__clang__) + return __atomic_load_n(p, __ATOMIC_ACQUIRE); +#else + uint32_t v = *reinterpret_cast(p); + std::atomic_thread_fence(std::memory_order_acquire); + return v; +#endif +} + +inline uint32_t shm_load_relaxed_u32(const uint32_t* p) { +#if defined(__GNUC__) || defined(__clang__) + return __atomic_load_n(p, __ATOMIC_RELAXED); +#else + return *reinterpret_cast(p); +#endif +} + +} // namespace + /////////////////////////////////////////////////////////////////////////////////////////////////// /// @brief Platform-specific implementation details (Pimpl idiom) /// @@ -299,7 +346,23 @@ struct ShmemSession::Impl { is_open = false; } - /// @brief Write a packet to the receive buffer ring + /// @brief Write a packet to the receive buffer ring. + /// + /// Cross-process publication: the consumer (CLIENT) reads head_index and + /// then reads packet bytes up to that index. On weak memory ordering + /// architectures (e.g. ARM/Apple Silicon) the head_index update must be + /// release-ordered with respect to the preceding memcpy and wrap update, + /// otherwise the consumer can observe an advanced head_index but stale + /// or partial bytes — which manifests as misaligned packets. + /// + /// Wrap padding: when a packet would not fit at @c head, the writer wraps + /// to offset 0. The skipped bytes between the previous packet and + /// @c buflen would otherwise leave the consumer's tail stranded inside + /// random gap data after the wrap. We pad the gap with a synthetic "wrap + /// marker" packet (chid=0, type=0, non-zero dlen) so the consumer can + /// advance tail through it cleanly and drop into the wrap. Wrap policy + /// also ensures the gap left after each write is either 0 or large + /// enough (>= cbPKT_HEADER_32SIZE) to fit a marker. Result writeToReceiveBuffer(const cbPKT_GENERIC& pkt) { if (!rec_buffer_raw) { return Result::error("Receive buffer not initialized"); @@ -312,20 +375,50 @@ struct ShmemSession::Impl { } uint32_t head = recHeadindex(); + uint32_t* buf = recBuffer(); - if (head + pkt_size_words > rec_buffer_len) { + // Decide whether to wrap. Wrap if either (a) the packet would not + // fit, or (b) writing it would leave a 1..3 dword tail gap that we + // cannot mark with a wrap-marker header on the next wrap. + const uint32_t end_after = head + pkt_size_words; + bool need_wrap = false; + if (end_after > rec_buffer_len) { + need_wrap = true; + } else if (end_after < rec_buffer_len && + (rec_buffer_len - end_after) < cbPKT_HEADER_32SIZE) { + need_wrap = true; + } + + if (need_wrap) { + // Pad the gap [head, buflen) with a wrap marker so the consumer + // can step over it. By the wrap-policy invariant above, gap is + // either 0 or >= cbPKT_HEADER_32SIZE. + uint32_t gap_dwords = rec_buffer_len - head; + if (gap_dwords >= cbPKT_HEADER_32SIZE) { + cbPKT_HEADER marker{}; + marker.time = 0; + marker.chid = 0; + marker.type = 0; + marker.dlen = static_cast(gap_dwords - cbPKT_HEADER_32SIZE); + marker.instrument = 0; + marker.reserved = 0; + std::memcpy(&buf[head], &marker, sizeof(cbPKT_HEADER)); + } head = 0; - recHeadwrap()++; + shm_store_relaxed_u32(&recHeadwrap(), recHeadwrap() + 1); } - uint32_t* buf = recBuffer(); const uint32_t* pkt_data = reinterpret_cast(&pkt); std::memcpy(&buf[head], pkt_data, pkt_size_words * sizeof(uint32_t)); - recHeadindex() = head + pkt_size_words; recReceived()++; recLasttime() = pkt.cbpkt_header.time; + // Release fence: head_index store synchronizes-with the consumer's + // acquire load, ensuring all prior writes (marker, memcpy, wrap, + // lasttime) are visible before the consumer sees the new head_index. + shm_store_release_u32(&recHeadindex(), head + pkt_size_words); + return Result::ok(); } @@ -505,9 +598,11 @@ struct ShmemSession::Impl { // In CLIENT mode, sync our read position to the current head so we only // read NEW packets, not stale data that was already in the ring buffer. + // Use acquire load on head_index to pair with the producer's release + // store (see writeToReceiveBuffer). if (mode == Mode::CLIENT) { - rec_tailindex = recHeadindex(); - rec_tailwrap = recHeadwrap(); + rec_tailindex = shm_load_acquire_u32(&recHeadindex()); + rec_tailwrap = shm_load_relaxed_u32(&recHeadwrap()); } // Detect protocol version for CENTRAL_COMPAT mode @@ -1843,8 +1938,12 @@ Result ShmemSession::readReceiveBuffer(cbPKT_GENERIC* packets, size_t max_ uint32_t* buf = m_impl->recBuffer(); uint32_t buflen = m_impl->rec_buffer_len; - uint32_t head_index = m_impl->recHeadindex(); - uint32_t head_wrap = m_impl->recHeadwrap(); + // Acquire-load: pairs with the producer's release-store of head_index in + // writeToReceiveBuffer. Without this, on weak memory architectures + // (ARM/Apple Silicon) we can observe an advanced head_index but stale or + // partial packet bytes, leading to misaligned reads of the ring buffer. + uint32_t head_index = shm_load_acquire_u32(&m_impl->recHeadindex()); + uint32_t head_wrap = shm_load_relaxed_u32(&m_impl->recHeadwrap()); if (m_impl->rec_tailwrap == head_wrap && m_impl->rec_tailindex == head_index) { return Result::ok(); @@ -1905,6 +2004,20 @@ Result ShmemSession::readReceiveBuffer(cbPKT_GENERIC* packets, size_t max_ raw_header_32size = cbPKT_HEADER_32SIZE; // 4 auto* hdr = reinterpret_cast(&buf[m_impl->rec_tailindex]); raw_dlen = hdr->dlen; + + // Wrap-marker packet inserted by the writer to fill the unused + // gap before a wrap-around (chid=0, type=0, dlen != 0). Skip + // silently — advance tail past the marker without reporting it + // to the caller so it never reaches user callbacks. + if (hdr->chid == 0 && hdr->type == 0 && hdr->dlen != 0) { + uint32_t marker_size = cbPKT_HEADER_32SIZE + hdr->dlen; + m_impl->rec_tailindex += marker_size; + if (m_impl->rec_tailindex >= buflen) { + m_impl->rec_tailindex -= buflen; + m_impl->rec_tailwrap++; + } + continue; + } } uint32_t pkt_size_dwords = raw_header_32size + raw_dlen;