Skip to content

In-stream looping + Context Manager API#3

Merged
ps-cd merged 48 commits into
mainfrom
stream_loop
Jun 2, 2025
Merged

In-stream looping + Context Manager API#3
ps-cd merged 48 commits into
mainfrom
stream_loop

Conversation

@ps-cd

@ps-cd ps-cd commented Jun 2, 2025

Copy link
Copy Markdown
Contributor

In-stream looping

Motivation and requirements: repeating the same sequence multiple times without fluctuating gap between iterations. Should be able to interrupt without waiting for all repetitions to complete. Should stop if any of the workers fails.

So far reps were done by calling stream_run() in a Python for-loop. NI tasks are stopped at the end of each rep and then are re-started at the beginning of the next one leading to a fluctuating 15-30 ms gap. The only way to get rigid inter-rep timing was to concatenate all reps into a single long sequence. Issues: not interruptible, workers don't stop if a peer fails, compile time can become quite long practically limiting nreps.

Here we are switching to in-stream looping - tasks are not stopped and re-started between reps, for NI cards the whole loop is a single continuous stream and we just repetitively write the same waveform into it (we need to re-calculate the samples though since storing them would require a huge amount of memory).

The following steps were necessary:

(1) Stop mechanism: "Finite samps" for short sequences and "Soft stop" for all others

NI DAQmx API provides 2 task stop modes - "Finite samples" and "Continuous samples".

  • DAQMX_VAL_FINITESAMPS - hardware counts the nuber of generated samples and stops automatically when the onboard counter reaches the specified total_samps_per_chan;
  • DAQMX_VAL_CONTSAMPS - generation runs until a software-timed stop call is made.

Originally, we always used DAQMX_VAL_FINITESAMPS mode. This method cannot be directly applied to in-stream looping - the user can request a stop at any time during the loop while the originally specified total_samps_per_chan cannot be changed on-the-fly. So one would need to use DAQMX_VAL_CONTSAMPS mode.

The challenge with continuous mode is to stop generation properly. Just calling software-timed stop may not hit the sequence end precisely thus leaving channels at potentially unsafe values. We use a "soft stop":

  • Remember last written values for each channel after every write call;
  • When stop is requested, write the final sample chunk and then write a full "dummy" buffer filled with the last written values;
  • Keep polling play position and do a software-timed stop when generation enters the dummy buffer.

This logic works well for "long" sequences - when sequence duration is greater than the calc/write chunksize (typically 100-150 ms) - but becomes challenging for anything shorter. For stream stability it is important that there is about chunksize samples in the buffer before launching and about that amount still left to play whenever any logic (calculate the next chunk, check stop flags and switch to stop buffer computation) is done at runtime. For a short sequence even the initial buffer will not be fully filled. And trying to do stop flag checks between repetitions would effectively mean reducing calculation/write chunk size below the stable chunksize.

This is why we have to forbid in-stream looping for short sequences - we use the "Finite Samples" stop mode and restrict to instream_reps = 1 for them. For long sequences, we switch to the "Continuous mode" + "soft stop" and allow for any instream_reps >= 1. This also resolves a connected side issue with 6535 cards being limited by a 32-bit counter (see this explanation).


There is another stop approach which would work for any sequence length - simply stop writing samples past the current repetition and let the task fail by underflow. This works - effectively does precise stop and the task seems to be usable after the underflow, but one needs to absorb the error return a couple of times. The main issue here is that it is hard to distinguish between an intentional underflow and an actual underflow that happened during the sequence. One can check the total_samps_generated property, but the test showed that it actually gives an overestimate - it is likely just counting sample clock ticks and sample clock is only stopped about 80 us after the actual underrun has happened. So it may mask the underflow which happened just before the end, which can be very serious. We only used this approach to patch a bug in the old version.

(2) Padding after each repetition to align start times across devices

Clock grids of different devices may not align due to incommensurate sampling rates plus some devices may get an extra tick at the end when compiling due to closing edge clipping. As a result, different devices will in general have different single-repetition play duration.

If not corrected, this difference will systematically add up over multiple repetitions resulting in relative drift between cards.

To avoid this, each worker adds padding samples (filled with the last written value) at the end of each repetition to aim at the common floating-point "target stop time" - (rep_idx + 1) * target_rep_dur, where target_rep_dur is the longest single-rep duration among all cards.

(3) Arc-based peer worker drop alarm

Need: if any worker fails at runtime, others should complete the current repetition but quit after that without starting the next one.

We implement a "drop alarm". Each worker holds a strong pointer to the same Arc<()> instance. If any of the workers quits in any way (by returning an error / by panicking), it drops the handle and strong reference count reduces. Peers check the strong count through their handles after each rep and quit if a drop was detected.

Manager thread. We initially implemented a separate manager thread to watch workers and notify everyone if someone fails. It was later replaced by a much simpler drop alarm approach. But multiple other improvements were made throughout manager implementation - simplified worker error collection, error-case automatic stream shutdown without waiting to reach finally/__exit__ logic in Python, and so on. This is why it was easier to leave the manager thread commits and move to drop alarm from there.

with Context Manager API

So far, stream control has been is exposed to the user through a single run() method. It packed init_stream and a Python for-loop over repetitive launches into try and called close_stream in finally to ensure stream was always closed.

But users may need to add custom logic between repetitive launches while still avoiding unnecessary task re-creation overhead: print progress, control other instruments, take actions based on incoming data analysis.

This is why we now exposed a witht context manager API. Stream initialization and closing is automatically done in __enter__() and __exit__() methods respectively while users can add custom code in the context block. All runtime stream controls are exposed through the returned StreamHandle.

The original run() method is now reimplemented through the context manager but behaves in the same way as before.

Regarding in-stream looping, we decided to only exposed it through StreamHandle.launch(instream_reps) within the with context. It is more "advanced" since it requires a sufficient sequence length in contrast to the relaunch-based repetitions in run(nreps) which can work with any sequence, so it is better to "hide" it further away. It should also help to avoid the confusion between nreps in run() and instream_reps.

ps-cd added 30 commits March 4, 2025 18:15
…shut_down_all_workers()`, `catch_closure()`, `manager` reports only `()` back to `main`
…ished`, `collect_err_info_undo_init_changes`, `request_stop`
…ngs, moved enum match for `worker_loop` to `impl NIDev`, moved `active_dev_names` to `BaseStreamer`
…nager `run`, transition back to `Ready` in `wait_until_finished`); removed `calc_next`; renamed `stream_controls` to `controls`
…til_finished` return, dual-mode `PyAPI.wait_until_finished()`; exposed `chunksize_ms` property
@ps-cd ps-cd merged commit 0f52abd into main Jun 2, 2025
1 of 12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant