Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions src/bdsim/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -2157,6 +2157,8 @@ def _start_movie(self) -> None:
fps=fps_int, extra_args=["-vcodec", "libx264"]
)
self._writer.setup(fig=self._fig, outfile=self._movie) # type: ignore[union-attr]
# Attach writer to figure so DisplayManager.refresh() can find it for fps-paced grabs.
self._fig._bdsim_movie_writer = self._writer # type: ignore[union-attr]
self._movie_started = True
print("movie block", self, " --> ", self._movie)
except FileNotFoundError:
Expand Down Expand Up @@ -2226,12 +2228,6 @@ def step(self, t: float, inports: list[Any]) -> None:
else:
self._fig.canvas.draw() # type: ignore[union-attr]

if self._movie is not None:
try:
self._writer.grab_frame() # type: ignore[union-attr]
except AttributeError:
self.fatal("cannot save movie, please install ffmpeg") # type: ignore[union-attr]

def done(self, block=False, **kwargs) -> None:
if self._fig is not None:
simstate = getattr(self, "_simstate", None)
Expand Down
22 changes: 22 additions & 0 deletions src/bdsim/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,24 @@
import matplotlib.pyplot as plt


def _grab_movie_frame(fig: Any) -> None:
"""Grab one frame for any MP4 writer attached to ``fig``.

The writer is set on ``fig._bdsim_movie_writer`` by
``GraphicsBlock._start_movie`` when a movie path is configured. Co-locating
the grab with the display refresh keeps the movie's frame cadence on the
fps grid (the same hook that drives ``flush_events()`` for live windows),
regardless of how many sink ticks the integrator emits in between.
"""
writer = getattr(fig, "_bdsim_movie_writer", None)
if writer is None:
return
try:
writer.grab_frame()
except AttributeError:
pass


class DisplayManager:
"""Abstract base class and factory for bdsim display managers.

Expand Down Expand Up @@ -228,6 +246,7 @@ def refresh(self) -> None:
handle.update(fig)
except Exception:
pass
_grab_movie_frame(fig)

def refresh_figure(self, fig: Any) -> None:
"""Refresh one figure, including figures not listed by pyplot."""
Expand All @@ -242,6 +261,7 @@ def refresh_figure(self, fig: Any) -> None:
handle.update(fig)
except Exception:
pass
_grab_movie_frame(fig)

def finalize(self, hold: bool = False) -> None:
"""Render a final frame and close all registered figures.
Expand Down Expand Up @@ -279,10 +299,12 @@ def refresh(self) -> None:
for fig in self._iter_figures():
fig.canvas.draw_idle()
fig.canvas.flush_events()
_grab_movie_frame(fig)

def refresh_figure(self, fig: Any) -> None:
fig.canvas.draw_idle()
fig.canvas.flush_events()
_grab_movie_frame(fig)

def finalize(self, hold: bool = False) -> None:
if hold:
Expand Down
67 changes: 48 additions & 19 deletions src/bdsim/run_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,22 +880,23 @@ def _solve_ivp_method(self, simstate: BDSimState) -> str:
return str(simstate.solver)

@staticmethod
def _build_t_eval_grid(t0: float, t1: float, dt: float) -> np.ndarray | None:
"""Build an absolute-time t_eval grid over the closed interval [t0, t1]."""
def _build_t_eval_grid(t0: float, t1: float, dt: float) -> np.ndarray:
"""Build an absolute-time t_eval grid over the closed interval [t0, t1].

The grid always contains the endpoints `t0` and `t1` plus any `k*dt`
multiples that land strictly between them. `dt > 0` and `t0 < t1` are
guaranteed by the caller.
"""
tol = 1e-12
if dt <= 0:
return None
k0 = int(np.ceil((t0 - tol) / dt))
k1 = int(np.floor((t1 + tol) / dt))
if k1 < k0:
return None
grid = dt * np.arange(k0, k1 + 1, dtype=float)
grid = grid[(grid >= (t0 - tol)) & (grid <= (t1 + tol))]
if grid.size == 0:
return None
grid = np.clip(grid, t0, t1)
grid = np.unique(grid)
return grid if grid.size > 0 else None
if k1 >= k0:
interior = dt * np.arange(k0, k1 + 1, dtype=float)
interior = interior[(interior >= (t0 - tol)) & (interior <= (t1 + tol))]
interior = np.clip(interior, t0, t1)
else:
interior = np.array([], dtype=float)
return np.unique(np.concatenate([[t0], interior, [t1]]))

def _dispatch_crossing_event(
self,
Expand Down Expand Up @@ -1461,13 +1462,36 @@ def _anim_frame(t: float, ss: Any, _dt: float = interactive_dt) -> None:
time.sleep(min(notebook_min_refresh_dt, 0.05))
if getattr(ss, "stop", None) is not None:
return
if t + _dt < tf - event_tol:
ss.declare_event(_anim_frame, t + _dt)
# Schedule the next anim_frame on the canonical k*_dt grid.
# `<= tf + tol` so a frame landing exactly on tf still fires.
next_t = (round(t / _dt) + 1) * _dt
if next_t <= tf + event_tol:
ss.declare_event(_anim_frame, next_t)

# Schedule frame callbacks for all animated runs; notebook mode
# relies on these callbacks to refresh inline figure output.
simstate.declare_event(_anim_frame, interactive_dt)

# Mirror the discrete-only t=0 pre-pass so hybrid diagrams capture
# the IC sample (the per-interval dedup skips result.t[0]=0).
if bd.nstates > 0:
simstate.t = 0.0
simstate.count += 1
eval_start = time.time()
bd.evaluate(bd.state_map(x0, simstate), 0.0, sinks=False)
simstate.bdtime += time.time() - eval_start
self._record_sample_and_service_hooks(
bd, simstate, 0.0, x0, stop_short_circuit=False,
)
# refresh() (present + grab_frame) only fires from _anim_frame
# at t=interactive_dt — call it once so the IC reaches the live
# window and the movie writer.
if (
simstate.options.animation
and simstate.display_manager is not None
):
simstate.display_manager.refresh()

while t0 < tf - event_tol:
# Next scheduled boundary (clock tick, explicit event, or terminal marker).
tnext, sources = simstate.eventq.pop(dt=1e-6)
Expand Down Expand Up @@ -1826,8 +1850,7 @@ def ydot(t: float, y: np.ndarray) -> np.ndarray:

if simstate.dt is not None:
t_eval = self._build_t_eval_grid(float(t0), float(t1), float(simstate.dt))
if t_eval is not None:
ivp_args.setdefault("t_eval", t_eval)
ivp_args.setdefault("t_eval", t_eval)

# Keep user-provided method if present; otherwise use option/default,
# then finally derive from run(..., solver=...).
Expand Down Expand Up @@ -1944,8 +1967,14 @@ def ydot(t: float, y: np.ndarray) -> np.ndarray:
crossing_state_map,
)

# return final continuous state and actual end time reached
t_final = float(result.t[-1]) if len(result.t) > 0 else float(t0)
# return final continuous state and actual end time reached.
# `result.t[-1]` is the last OUTPUT sample, not the integrator's
# reach; on natural completion it can undershoot t1 and trigger a
# spurious residual call. Use t1 directly when status == 0.
if result.status == 0:
t_final = float(t1)
else:
t_final = float(result.t[-1]) if len(result.t) > 0 else float(t0)
if len(result.t) > 0:
if crossing_handled:
x_final = bd.continuous_state_vector(crossing_state_map)
Expand Down
27 changes: 27 additions & 0 deletions tests/test_display_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,40 @@

matplotlib.use("Agg")

from unittest.mock import MagicMock

from bdsim.display import (
DisplayManager,
MatplotlibDisplayManager,
NotebookDisplayManager,
_grab_movie_frame,
)


def test_grab_movie_frame_no_writer_is_noop() -> None:
"""A figure with no `_bdsim_movie_writer` attribute is a silent no-op."""
fig = SimpleNamespace()
_grab_movie_frame(fig) # must not raise


def test_grab_movie_frame_with_writer_grabs_once() -> None:
"""When a writer is attached, `_grab_movie_frame` calls `grab_frame()` once."""
writer = MagicMock()
fig = SimpleNamespace(_bdsim_movie_writer=writer)
_grab_movie_frame(fig)
writer.grab_frame.assert_called_once()


def test_grab_movie_frame_swallows_attribute_error() -> None:
"""If the writer is broken (no grab_frame), the helper stays quiet."""
# Object without a grab_frame attribute raises AttributeError when called.
class BrokenWriter:
pass

fig = SimpleNamespace(_bdsim_movie_writer=BrokenWriter())
_grab_movie_frame(fig) # must not raise


def test_factory_returns_notebook_manager() -> None:
manager = DisplayManager.create(notebook_backend=True)
assert isinstance(manager, NotebookDisplayManager)
Expand Down
12 changes: 1 addition & 11 deletions tests/test_graphics.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,8 @@ def test_step_tiled_shared_figure_draws_once_per_time(self):
finally:
plt.close("all")

def test_step_movie_no_writer_attribute_error(self):
"""step() with movie set but no writer → AttributeError in grab_frame (lines 75-79)."""
gb = MinGB(nin=1, movie="out.mp4")
ss = _make_simstate(animation=False)
gb._simstate = ss
# self._writer never set → AttributeError from grab_frame except block → fatal() → AttributeError
with self.assertRaises(AttributeError):
gb.step(0.0, [1.0])

def test_step_timestamp_overlay_updates(self):
"""step() updates the timestamp text and then grabs a movie frame."""
"""step() updates the timestamp text artist."""
gb = MinGB(nin=1, movie="out.mp4", timestamp=True)
ss = _make_simstate(animation=False)
gb._simstate = ss
Expand All @@ -203,7 +194,6 @@ def test_step_timestamp_overlay_updates(self):
gb.step(1.2345, [1.0])
self.assertIsNotNone(gb._timestamp_artist)
self.assertEqual(gb._timestamp_artist.get_text(), "t=1.234")
gb.writer.grab_frame.assert_called_once()
finally:
plt.close("all")

Expand Down
79 changes: 79 additions & 0 deletions tests/test_run_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -1364,5 +1364,84 @@ def test_cli_set_empty_by_default(self):
self.assertEqual(opts.setglob, [])


# ---------------------------------------------------------------------------
class HybridSimSampleGridTest(unittest.TestCase):
"""Regression tests for the per-interval sample grid in hybrid runs.

Each test sets up a trivial `CONSTANT → INTEGRATOR` diagram with
animation enabled. The integrator gives the diagram continuous state
(so `_interval_hybrid` is exercised); the diagram itself is trivial
so the sample grid is determined entirely by `dt`, the anim_frame
cadence, and the integration loop's sample-logging behavior.
"""

@staticmethod
def _run(T: float, dt: float, fps: float, **run_kwargs):
sim = bdsim.BDSim(animation=True, animation_rate=fps, quiet=True)
bd = sim.blockdiagram()
bd.connect(bd.CONSTANT(1.0), bd.INTEGRATOR(0.0))
bd.compile()
return sim.run(bd, T=T, dt=dt, **run_kwargs)

def test_out_t_starts_at_zero(self):
"""The IC sample at t=0 lands in out.t."""
out = self._run(T=1.0, dt=1 / 30, fps=30)
self.assertEqual(float(out.t[0]), 0.0)

def test_no_drift_doubled_samples(self):
"""Past the FP drift threshold, out.t still has the expected inclusive
dt-grid count and contains no duplicate timestamps."""
out = self._run(T=3.5, dt=1 / 30, fps=30)
self.assertEqual(len(out.t), 106) # 3.5 * 30 + 1
self.assertGreater(float(np.diff(out.t).min()), 0.0)

def test_anim_frame_boundary_in_tlist(self):
"""Non-`dt`-aligned anim_frame boundaries land in out.t."""
out = self._run(T=1.0, dt=0.1, fps=3)
for boundary in (1 / 3, 2 / 3):
self.assertTrue(
np.any(np.isclose(out.t, boundary, atol=1e-9)),
f"expected t={boundary:.4f} in out.t, got {list(out.t)}",
)

def test_no_residual_sample_explosion(self):
"""Small `max_step` on a non-`dt`-aligned boundary does not dump every
integrator step into out.t."""
out = self._run(T=1.0, dt=0.1, fps=3, max_step=1e-3)
self.assertEqual(len(out.t), 13) # 11 dt-grid + 2 anim_frame boundaries

def test_no_empty_grid_sample_explosion(self):
"""An interval with no `k*dt` multiple inside (here the final
`[1/3, 0.34]`) does not dump every integrator step into out.t."""
out = self._run(T=0.34, dt=0.1, fps=3, max_step=1e-3)
self.assertEqual(len(out.t), 6) # 0, 0.1, 0.2, 0.3, 1/3, 0.34


# ---------------------------------------------------------------------------
class BuildTEvalGridTest(unittest.TestCase):
"""Unit tests for `BDSim._build_t_eval_grid`.

Contract: returns a non-empty `np.ndarray` containing exactly the endpoints
`t0` and `t1` plus any `k*dt` multiples strictly between them, sorted and
deduplicated.
"""

def test_endpoints_only_when_no_multiple_fits(self):
"""`t1 - t0 < dt` and no `k*dt` in `[t0, t1]` → grid is `[t0, t1]`."""
grid = BDSim._build_t_eval_grid(0.333, 0.34, 0.1)
np.testing.assert_allclose(grid, [0.333, 0.34])

def test_aligned_endpoints_no_duplicate(self):
"""`t0 == k*dt` and `t1 == (k+n)*dt` → clean dt-aligned sequence,
endpoint inclusion does not introduce duplicates."""
grid = BDSim._build_t_eval_grid(0.0, 0.3, 0.1)
np.testing.assert_allclose(grid, [0.0, 0.1, 0.2, 0.3])

def test_endpoints_with_interior_multiples(self):
"""Non-aligned endpoints with `k*dt` points between."""
grid = BDSim._build_t_eval_grid(0.123, 0.456, 0.1)
np.testing.assert_allclose(grid, [0.123, 0.2, 0.3, 0.4, 0.456])


if __name__ == "__main__":
unittest.main()