Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 56 additions & 3 deletions examples/SimpleDevice/simple_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,23 @@ DeviceType parseDeviceType(const std::string& type_str) {
std::exit(1);
}

/// Map runlevel value to name
const char* getRunLevelName(uint32_t runlevel) {
switch (runlevel) {
case 0: return "UNKNOWN";
case cbRUNLEVEL_STARTUP: return "STARTUP";
case cbRUNLEVEL_HARDRESET: return "HARDRESET";
case cbRUNLEVEL_STANDBY: return "STANDBY";
case cbRUNLEVEL_RESET: return "RESET";
case cbRUNLEVEL_RUNNING: return "RUNNING";
case cbRUNLEVEL_STRESSED: return "STRESSED";
case cbRUNLEVEL_ERROR: return "ERROR";
case cbRUNLEVEL_SHUTDOWN: return "SHUTDOWN";
case cbRUNLEVEL_UPDATE: return "UPDATE";
default: return "?";
}
}

/// Get device type name string
const char* getDeviceTypeName(DeviceType type) {
switch (type) {
Expand All @@ -111,14 +128,23 @@ int main(int argc, char* argv[]) {
std::cout << "================================================\n\n";

// Parse command line arguments
// simple_device [device_type] [duration_seconds]
// duration_seconds <= 0 means "run until Ctrl+C" (default).
DeviceType device_type = DeviceType::LEGACY_NSP; // Default to NSP
int duration_seconds = 0;

if (argc >= 2) {
device_type = parseDeviceType(argv[1]);
}
if (argc >= 3) {
duration_seconds = std::atoi(argv[2]);
}

std::cout << "Configuration:\n";
std::cout << " Device Type: " << getDeviceTypeName(device_type) << "\n";
if (duration_seconds > 0) {
std::cout << " Duration: " << duration_seconds << "s (auto-stop)\n";
}

// Register signal handler for clean shutdown
signal(SIGINT, signalHandler);
Expand Down Expand Up @@ -167,11 +193,30 @@ int main(int argc, char* argv[]) {

std::cout << "Packet callback registered.\n\n";

// Log runlevel transitions reported by the device.
session.registerRunlevelChangeCallback([](uint32_t rl) {
std::cout << "\n[runlevel change] -> " << rl
<< " (" << getRunLevelName(rl) << ")\n";
});

// Step 4: Session is already running (auto-started by SDK)
std::cout << "SDK session is running and receiving packets...\n\n";
std::cout << "SDK session is running and receiving packets...\n";
std::cout << " Standalone: " << (session.isStandalone() ? "yes" : "no (CLIENT)") << "\n";
std::cout << " Proto ver: 0x" << std::hex << session.getProtocolVersion() << std::dec << "\n";
std::cout << " Proc ident: " << session.getProcIdent() << "\n";
{
uint32_t rl = session.getRunLevel();
std::cout << " Runlevel: " << rl << " (" << getRunLevelName(rl) << ")\n";
}
std::cout << "\n";

// Step 5: Run for specified duration, showing statistics
std::cout << "Receiving packets... (Press Ctrl+C to stop)\n\n";
if (duration_seconds > 0) {
std::cout << "Receiving packets for " << duration_seconds
<< "s (or Ctrl+C to stop early)...\n\n";
} else {
std::cout << "Receiving packets... (Press Ctrl+C to stop)\n\n";
}
std::cout << "Statistics (updated every second):\n";
std::cout << "-----------------------------------\n";

Expand All @@ -186,9 +231,17 @@ int main(int argc, char* argv[]) {
auto now = std::chrono::steady_clock::now();
seconds_elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start_time).count();

if (duration_seconds > 0 && seconds_elapsed >= duration_seconds) {
g_running = false;
}

uint32_t runlevel = session.getRunLevel();

// Print statistics
std::cout << "\r[" << std::setw(3) << seconds_elapsed << "s] "
<< "Total: " << std::setw(8) << packet_count.load()
<< "RL: " << std::setw(2) << runlevel << " (" << std::setw(9) << std::left
<< getRunLevelName(runlevel) << std::right << ")"
<< " | Total: " << std::setw(8) << packet_count.load()
<< " | Config: " << std::setw(6) << config_count.load()
<< " | Spikes: " << std::setw(8) << spike_count.load()
<< " | RX: " << std::setw(10) << stats.packets_received_from_device
Expand Down
170 changes: 170 additions & 0 deletions pycbsdk/tests/test_client_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
SampleRate,
Session,
)
from pycbsdk.session import RUNLEVEL_RUNNING

pytestmark = pytest.mark.integration

Expand Down Expand Up @@ -286,3 +287,172 @@ def test_device_time_nonzero(self, client_session):
"""CLIENT should read device time from shmem."""
t = client_session.time
assert t > 0


# ---------------------------------------------------------------------------
# Runlevel and sync in CLIENT mode
# ---------------------------------------------------------------------------


class TestClientRunlevelAndSync:
"""Verify CLIENT-mode runlevel reporting and sync().

Regression coverage for two bugs:

1. CLIENT-mode ``getRunLevel()`` previously returned 0. The atomic it
reads only updates when a SYSREP packet flows through the CLIENT's
receive ring, but devices only emit SYSREP in response to runlevel
commands — so after the STANDALONE owner finished its handshake the
CLIENT's atomic stayed at 0 indefinitely. Fix: ``getRunLevel()``
falls back to the SYSINFO mirror in shmem (which the STANDALONE
writes via ``setSysInfo`` on every SYSREP).

2. CLIENT-mode ``sync()`` previously waited on any 0x10..0x1F SYSREP,
so periodic SYSREP (0x10) heartbeats from nPlayServer could falsely
satisfy the wait before the actual SYSREPRUNLEV (0x12) reply
arrived. ``sync()`` could also send SYSSETRUNLEV with
``runlevel=0`` because it read the per-session atomic instead of
``getRunLevel()``. Fix: ``sync()`` uses ``getRunLevel()`` and
waits on a sticky ``received_sysrepRunlev`` flag set only on
type 0x12.
"""

def test_runlevel_is_running(self, client_session):
"""CLIENT must report RUNLEVEL_RUNNING (50), not 0."""
assert client_session.runlevel == RUNLEVEL_RUNNING

def test_runlevel_matches_standalone(self, nplay_session, client_session):
"""STANDALONE and CLIENT must agree on the runlevel."""
assert client_session.runlevel == nplay_session.runlevel

def test_sync_does_not_timeout(self, client_session):
"""sync() in CLIENT mode must complete within the timeout."""
client_session.sync(timeout=2.0)

def test_sync_after_setter_observes_new_state(self, client_session):
"""Round-trip a CLIENT-issued setter through sync() and read back.

If sync() were satisfied by a heartbeat (the pre-fix behavior) we
could observe stale state on the read-back, since the actual
SYSREPRUNLEV reply might still be in flight along with the
device-applied CHANSET acknowledgments. This test exercises the
full happy path: configure → sync → read.
"""
client_session.set_sample_group(
N_CHANS, ChannelType.FRONTEND, SampleRate.SR_1kHz,
disable_others=True,
)
client_session.sync(timeout=2.0)
# The synced state must be visible.
ids = client_session.get_matching_channel_ids(
ChannelType.FRONTEND, n_chans=N_CHANS,
)
smpgroups = client_session.get_channels_field(
ChannelType.FRONTEND, ChanInfoField.SMPGROUP, N_CHANS,
)
assert len(ids) == N_CHANS
assert all(g == int(SampleRate.SR_1kHz) for g in smpgroups), smpgroups
# Restore for any later tests sharing the nplay_session.
client_session.set_sample_group(
N_CHANS, ChannelType.FRONTEND, SampleRate.SR_30kHz,
disable_others=True,
)
client_session.sync(timeout=2.0)

def test_runlevel_stable_during_steady_state(self, client_session):
"""Runlevel must remain RUNNING across reads and not flicker.

Before the wrap-marker fix in the rec ring buffer, the CLIENT
would observe garbage SYSREP-family packets after the first
buffer wrap, firing spurious runlevel-change events. This test
is a lightweight check that runlevel is stable; the deeper wrap
regression test is :class:`TestClientBufferWrapRegression`.
"""
for _ in range(5):
assert client_session.runlevel == RUNLEVEL_RUNNING
time.sleep(0.05)


# ---------------------------------------------------------------------------
# Buffer-wrap regression test
# ---------------------------------------------------------------------------


@pytest.fixture()
def client_session_full_rate(nplay_session):
"""CLIENT session with all FRONTEND channels enabled at 30 kHz.

Used to drive enough data through the rec ring buffer to force at
least one wrap during the test. Each group packet is ~528 bytes
(256 ch × int16 + header), so at 30 kHz that's ~15.84 MB/s; the
rec buffer is ~268 MB and wraps in ~17 s.
"""
n_fe = nplay_session.num_fe_chans()
nplay_session.set_sample_group(
n_fe, ChannelType.FRONTEND, SampleRate.SR_30kHz,
disable_others=False,
)
nplay_session.sync(timeout=2.0)

with Session(DeviceType.NPLAY) as client:
time.sleep(1)
yield client

# Restore the small-channel default for sibling tests.
nplay_session.set_sample_group(
N_CHANS, ChannelType.FRONTEND, SampleRate.SR_30kHz,
disable_others=True,
)
nplay_session.sync(timeout=2.0)


class TestClientBufferWrapRegression:
"""Verify CLIENT data integrity across a rec ring buffer wrap.

Regression test for the wrap-marker fix in
``cbshm/src/shmem_session.cpp``. Before the fix, when the writer
wrapped the rec ring buffer it left a 1+ dword gap that the reader
had no way to detect — the CLIENT would read garbage as fake
packets after the first wrap, manifesting as out-of-range
``chid`` / ``type`` values and spurious runlevel-change events.

The test runs long enough (~20 s) to guarantee at least one wrap
at the full ~15.84 MB/s rate, then asserts that no malformed
packets reached the user callback and that the runlevel is still
correct.
"""

def test_no_malformed_packets_through_wrap(self, client_session_full_rate):
"""No malformed packets must reach a user catch-all callback."""
# Cache outside the hot callback path.
max_chans = client_session_full_rate.max_chans()
bad = []
good_count = [0]

@client_session_full_rate.on_packet()
def on_pkt(header, data):
good_count[0] += 1
# Real packets have either chid == 0 (group sample) or chid in
# 1..max_chans (events / chaninfo) or chid == 0x8000
# (configuration channel). The high byte of `type` is always 0
# in the current protocol; a non-zero high byte indicates the
# bytes were misinterpreted as a header.
if (header.type & 0xFF00) != 0:
bad.append(("type-high-bits", int(header.type), int(header.chid)))
elif header.chid != 0 and header.chid != 0x8000:
if header.chid > max_chans:
bad.append(("chid-out-of-range", int(header.chid),
int(header.type)))

# Run long enough to force at least one wrap of the ~268 MB rec
# buffer at ~15.84 MB/s. 20 s gives a comfortable margin.
time.sleep(20)

assert good_count[0] > 0, "CLIENT received no packets"
assert not bad, (
f"CLIENT received {len(bad)} malformed packet(s) after a buffer "
f"wrap (first 5: {bad[:5]}). This usually means the rec ring "
f"buffer wrap-marker handling regressed."
)
# Runlevel must still be sane after going through wraps.
assert client_session_full_rate.runlevel == RUNLEVEL_RUNNING
14 changes: 9 additions & 5 deletions src/cbsdk/include/cbsdk/sdk_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -842,19 +842,23 @@ class SdkSession {

/// Wait for SYSREP packet (helper for handshaking)
/// @param timeout_ms Timeout in milliseconds
/// @param expected_runlevel Expected runlevel (0 = any SYSREP)
/// @return true if SYSREP received with expected runlevel, false if timeout
bool waitForSysrep(uint32_t timeout_ms, uint32_t expected_runlevel = 0) const;
/// @param expected_runlevel Expected runlevel (0 = any runlevel)
/// @param expected_type Expected SYSREP packet type (0 = any 0x10..0x1F)
/// @return true if matching SYSREP received, false if timeout
bool waitForSysrep(uint32_t timeout_ms, uint32_t expected_runlevel = 0,
uint16_t expected_type = 0) const;

/// Send a runlevel command packet to the device (internal version with wait_for_runlevel)
/// @param runlevel Desired runlevel (cbRUNLEVEL_*)
/// @param resetque Channel for reset to queue on
/// @param runflags Lock recording after reset
/// @param wait_for_runlevel Runlevel to wait for (0 = any SYSREP)
/// @param wait_for_runlevel Runlevel to wait for (0 = any runlevel)
/// @param timeout_ms Timeout in milliseconds
/// @param expected_type Expected SYSREP type to wait for (0 = any 0x10..0x1F)
/// @return Result indicating success or error
Result<void> setSystemRunLevel(uint32_t runlevel, uint32_t resetque, uint32_t runflags,
uint32_t wait_for_runlevel, uint32_t timeout_ms);
uint32_t wait_for_runlevel, uint32_t timeout_ms,
uint16_t expected_type = 0);

/// Request configuration with custom timeout (internal version)
/// @param timeout_ms Timeout in milliseconds
Expand Down
Loading
Loading