From 206c3bf6fe7d9815dd609eeedc7a652321ef93f8 Mon Sep 17 00:00:00 2001 From: Uri Shaket Date: Sun, 15 Feb 2026 13:56:30 +0200 Subject: [PATCH 1/6] Add cross-rate early exit for multi-rate benchmark profiles When running multiple rates (constant, poisson, concurrent profiles) or sweeping, stop escalating to higher rates if a failure constraint (over-saturation, max errors, error rate) triggers at a lower rate. - Sort rates/streams ascending in AsyncProfile and ConcurrentProfile - Add _should_stop_escalating() on base Profile class using stop_all as the failure signal (vs stop_local for normal completions) - Skip failure check after throughput phase in SweepProfile since over-saturation is expected at maximum load - Log warning when rate order is changed by sorting - Update CLI help and README with multi-rate documentation - Add comprehensive unit tests for all profile types Signed-off-by: Uri Shaket --- README.md | 8 +- src/guidellm/__main__.py | 10 +- src/guidellm/benchmark/profiles.py | 93 +++++- tests/unit/benchmark/test_profiles.py | 430 ++++++++++++++++++++++++++ 4 files changed, 526 insertions(+), 15 deletions(-) create mode 100644 tests/unit/benchmark/test_profiles.py diff --git a/README.md b/README.md index 299645dfc..d1c546a3c 100644 --- a/README.md +++ b/README.md @@ -174,7 +174,13 @@ guidellm benchmark \ **Key parameters:** - `--profile`: Defines the traffic pattern - options include `synchronous` (sequential requests), `concurrent` (parallel users), `throughput` (maximum capacity), `constant` (fixed requests/sec), `poisson` (randomized requests/sec), or `sweep` (automatic rate exploration) -- `--rate`: The numeric rate value whose meaning depends on profile - for `sweep` it's the number of benchmarks, for `concurrent` it's simultaneous requests, for `constant`/`poisson` it's requests per second +- `--rate`: The numeric rate value whose meaning depends on profile: + - `constant`/`poisson`: requests per second + - `concurrent`: number of simultaneous streams + - `sweep`: number of benchmarks (only first value used) + - `throughput`: max concurrency (only first value used) + + For `constant`, `poisson`, and `concurrent`, multiple values can be specified (e.g., `--rate 1 --rate 5 --rate 10`). Values are sorted ascending, and if a failure constraint (over-saturation, errors) triggers at a given rate, remaining higher rates are skipped. - `--max-seconds`: Maximum duration in seconds for each benchmark run (can also use `--max-requests` to limit by request count instead) ### Dataset Sources diff --git a/src/guidellm/__main__.py b/src/guidellm/__main__.py index 7e9dab87f..13058fdbd 100644 --- a/src/guidellm/__main__.py +++ b/src/guidellm/__main__.py @@ -149,8 +149,14 @@ def benchmark(): default=BenchmarkGenerativeTextArgs.get_default("rate"), help=( "Benchmark rate(s) to test. Meaning depends on profile: " - "sweep=number of benchmarks, concurrent=concurrent requests, " - "async/constant/poisson=requests per second." + "constant/poisson=requests per second, " + "concurrent=number of parallel streams, " + "sweep=number of benchmarks (only first value used), " + "throughput=max concurrency (only first value used). " + "For constant, poisson, and concurrent profiles, multiple values " + "can be specified (e.g., --rate 1 --rate 5 --rate 10), are sorted " + "ascending, and if a failure constraint (over-saturation, errors) " + "triggers at a given rate, higher rates are skipped." ), ) # Backend configuration diff --git a/src/guidellm/benchmark/profiles.py b/src/guidellm/benchmark/profiles.py index 054356c10..6322924a6 100644 --- a/src/guidellm/benchmark/profiles.py +++ b/src/guidellm/benchmark/profiles.py @@ -27,6 +27,7 @@ ) from guidellm import settings +from guidellm.logger import logger from guidellm.scheduler import ( AsyncConstantStrategy, AsyncPoissonStrategy, @@ -162,6 +163,33 @@ def strategy_types(self) -> list[str]: """ return [strat.type_ for strat in self.completed_strategies] + @staticmethod + def _should_stop_escalating(prev_benchmark: Benchmark) -> bool: + """ + Check if a benchmark was terminated by a failure constraint. + + Inspects the scheduler state's end_queuing_constraints for any constraint + that used "stop_all" for request processing, which indicates the system + could not handle the load (over-saturation, excessive errors, etc.). + Constraints that use "stop_local" (max duration, max requests) are normal + completions and do not trigger escalation stops. + + :param prev_benchmark: Benchmark instance with a scheduler_state attribute + :return: True if a failure constraint was triggered, False otherwise + """ + scheduler_state = getattr(prev_benchmark, "scheduler_state", None) + if scheduler_state is None: + return False + + for name, action in scheduler_state.end_queuing_constraints.items(): + if action.request_processing == "stop_all": + logger.info( + f"Stopping rate escalation: constraint '{name}' " + f"triggered (request_processing=stop_all)" + ) + return True + return False + def strategies_generator( self, ) -> Generator[ @@ -362,7 +390,17 @@ def resolve_args( """ _ = (rate_type, random_seed) # unused rate = rate if isinstance(rate, list) or rate is None else [rate] - kwargs["streams"] = [int(stream) for stream in rate] if rate else None + if rate: + streams = [int(stream) for stream in rate] + sorted_streams = sorted(streams) + if sorted_streams != streams: + logger.warning( + f"Streams reordered from {streams} to " + f"{sorted_streams} (ascending)" + ) + kwargs["streams"] = sorted_streams + else: + kwargs["streams"] = None return kwargs @property @@ -380,15 +418,21 @@ def next_strategy( """ Generate concurrent strategy for next stream count. - :param prev_strategy: Previously completed strategy (unused) - :param prev_benchmark: Benchmark results from previous execution (unused) + Stream counts are sorted ascending, so if a previous stream count was + terminated by a failure constraint (over-saturation, errors, etc.), all + remaining higher stream counts are skipped. + + :param prev_strategy: Previously completed strategy + :param prev_benchmark: Benchmark results from previous execution :return: ConcurrentStrategy with next stream count, or None if complete + or failure detected """ - _ = (prev_strategy, prev_benchmark) # unused - if len(self.completed_strategies) >= len(self.streams): return None + if prev_benchmark is not None and self._should_stop_escalating(prev_benchmark): + return None + return ConcurrentStrategy( streams=self.streams[len(self.completed_strategies)], rampup_duration=self.rampup_duration, @@ -522,7 +566,13 @@ def resolve_args( if rate_type in ["constant", "poisson"] else kwargs.get("strategy_type", "constant") ) - kwargs["rate"] = rate if isinstance(rate, list) else [rate] + rate_list = rate if isinstance(rate, list) else [rate] + sorted_rates = sorted(rate_list) + if sorted_rates != rate_list: + logger.warning( + f"Rates reordered from {rate_list} to {sorted_rates} (ascending)" + ) + kwargs["rate"] = sorted_rates kwargs["random_seed"] = random_seed return kwargs @@ -542,17 +592,22 @@ def next_strategy( """ Generate async strategy for next configured rate. - :param prev_strategy: Previously completed strategy (unused) - :param prev_benchmark: Benchmark results from previous execution (unused) + Rates are sorted ascending, so if a previous rate was terminated by a + failure constraint (over-saturation, errors, etc.), all remaining higher + rates are skipped. + + :param prev_strategy: Previously completed strategy + :param prev_benchmark: Benchmark results from previous execution :return: AsyncConstantStrategy or AsyncPoissonStrategy for next rate, - or None if all rates completed + or None if all rates completed or failure detected :raises ValueError: If strategy_type is neither 'constant' nor 'poisson' """ - _ = (prev_strategy, prev_benchmark) # unused - if len(self.completed_strategies) >= len(self.rate): return None + if prev_benchmark is not None and self._should_stop_escalating(prev_benchmark): + return None + current_rate = self.rate[len(self.completed_strategies)] if self.strategy_type == "constant": @@ -660,7 +715,9 @@ def next_strategy( Generate next strategy in adaptive sweep sequence. Executes synchronous and throughput strategies first to measure baseline - rates, then generates interpolated rates for async strategies. + rates, then generates interpolated rates for async strategies. If a + failure constraint is triggered during the async phase, all remaining + higher rates are skipped. :param prev_strategy: Previously completed strategy instance :param prev_benchmark: Benchmark results from previous strategy execution @@ -692,6 +749,18 @@ def next_strategy( self.sweep_size - 1, ) )[1:] # don't rerun synchronous + # After throughput, fall through to async rate logic below. + # Don't check escalation since throughput is designed to push + # beyond sustainable load (over-saturation is expected). + + # Stop escalation if a failure constraint was triggered. + # The throughput guard above skips this via the != "throughput" check. + # Synchronous never reaches here (returns ThroughputStrategy above). + if ( + prev_strategy.type_ != "throughput" + and self._should_stop_escalating(prev_benchmark) + ): + return None next_index = ( len(self.completed_strategies) - 1 - 1 diff --git a/tests/unit/benchmark/test_profiles.py b/tests/unit/benchmark/test_profiles.py new file mode 100644 index 000000000..9127f5c17 --- /dev/null +++ b/tests/unit/benchmark/test_profiles.py @@ -0,0 +1,430 @@ +""" +Tests for cross-rate early exit behavior in AsyncProfile, ConcurrentProfile, +and SweepProfile. + +Validates that: +- AsyncProfile and ConcurrentProfile sort rates/streams ascending +- Failure constraints (stop_all) stop rate escalation +- Normal completions (stop_local) do not stop rate escalation +- SweepProfile stops escalation during the async phase but not during throughput +""" + +from types import SimpleNamespace + +import pytest + +from guidellm.benchmark.profiles import ( + AsyncProfile, + ConcurrentProfile, + Profile, + SweepProfile, +) +from guidellm.scheduler import ( + AsyncConstantStrategy, + AsyncPoissonStrategy, + ConcurrentStrategy, + SchedulerState, + SchedulerUpdateAction, + SynchronousStrategy, +) + + +# ============================================================================ +# Helpers +# ============================================================================ + + +def _make_mock_benchmark( + end_queuing_constraints: dict[str, SchedulerUpdateAction] | None = None, + request_throughput_mean: float = 10.0, +): + """ + Create a lightweight mock benchmark with a scheduler_state. + + Uses SimpleNamespace to avoid constructing a full GenerativeBenchmark, + which requires many nested fields. + """ + state = SchedulerState() + if end_queuing_constraints: + state.end_queuing_constraints = end_queuing_constraints + + throughput = SimpleNamespace( + successful=SimpleNamespace(mean=request_throughput_mean), + ) + + return SimpleNamespace( + scheduler_state=state, + request_throughput=throughput, + ) + + +def _make_failure_action(name: str = "test_constraint") -> SchedulerUpdateAction: + """Create a SchedulerUpdateAction with stop_all (failure).""" + return SchedulerUpdateAction( + request_queuing="stop", + request_processing="stop_all", + metadata={f"{name}_triggered": True}, + ) + + +def _make_normal_completion_action() -> SchedulerUpdateAction: + """Create a SchedulerUpdateAction with stop_local (normal completion).""" + return SchedulerUpdateAction( + request_queuing="stop", + request_processing="stop_local", + metadata={"duration_exceeded": True}, + ) + + +def _make_failure_benchmark(constraint_name: str = "over_saturation"): + """Create a mock benchmark terminated by a stop_all constraint.""" + return _make_mock_benchmark( + end_queuing_constraints={ + constraint_name: _make_failure_action(constraint_name), + } + ) + + +def _make_normal_benchmark(): + """Create a mock benchmark that completed normally (stop_local).""" + return _make_mock_benchmark( + end_queuing_constraints={ + "max_duration": _make_normal_completion_action(), + } + ) + + +def _advance_sweep_to_async_phase(profile: SweepProfile, sync_rate=2.0, tp_rate=10.0): + """ + Advance a SweepProfile through sync and throughput phases, returning + the first async strategy. Mutates profile.completed_strategies. + + Follows the same protocol as strategies_generator: each strategy is + appended to completed_strategies before the next next_strategy call. + """ + # Phase 1: synchronous + sync_strat = SynchronousStrategy() + sync_benchmark = _make_mock_benchmark(request_throughput_mean=sync_rate) + throughput_strat = profile.next_strategy(sync_strat, sync_benchmark) + profile.completed_strategies.append(sync_strat) + + # Phase 2: throughput + throughput_benchmark = _make_mock_benchmark(request_throughput_mean=tp_rate) + profile.completed_strategies.append(throughput_strat) + first_async_strat = profile.next_strategy(throughput_strat, throughput_benchmark) + + return first_async_strat, throughput_strat + + +# ============================================================================ +# Profile._should_stop_escalating tests +# ============================================================================ + + +class TestShouldStopEscalating: + """Tests for the shared Profile._should_stop_escalating static method.""" + + def test_stop_all_triggers_escalation_stop(self): + """A constraint with request_processing=stop_all should trigger stop.""" + benchmark = _make_failure_benchmark("over_saturation") + assert Profile._should_stop_escalating(benchmark) is True + + def test_stop_local_does_not_trigger(self): + """A constraint with request_processing=stop_local should not trigger.""" + benchmark = _make_normal_benchmark() + assert Profile._should_stop_escalating(benchmark) is False + + def test_no_constraints(self): + """No constraints triggered means no stop.""" + benchmark = _make_mock_benchmark(end_queuing_constraints={}) + assert Profile._should_stop_escalating(benchmark) is False + + def test_no_scheduler_state(self): + """Benchmark without scheduler_state should not stop.""" + benchmark = SimpleNamespace() + assert Profile._should_stop_escalating(benchmark) is False + + def test_mixed_constraints_with_one_stop_all(self): + """If multiple constraints present and one is stop_all, should stop.""" + benchmark = _make_mock_benchmark( + end_queuing_constraints={ + "max_duration": _make_normal_completion_action(), + "over_saturation": _make_failure_action("over_saturation"), + } + ) + assert Profile._should_stop_escalating(benchmark) is True + + def test_multiple_stop_local_does_not_trigger(self): + """Multiple stop_local constraints should not trigger stop.""" + benchmark = _make_mock_benchmark( + end_queuing_constraints={ + "max_duration": _make_normal_completion_action(), + "max_requests": _make_normal_completion_action(), + } + ) + assert Profile._should_stop_escalating(benchmark) is False + + +# ============================================================================ +# Rate/stream sorting tests (parametrized across AsyncProfile & ConcurrentProfile) +# ============================================================================ + + +@pytest.mark.parametrize( + "profile_cls, rate_type, unsorted_input, sorted_output, output_key", + [ + (AsyncProfile, "constant", [50.0, 10.0, 1.0, 25.0], [1.0, 10.0, 25.0, 50.0], "rate"), + (AsyncProfile, "constant", [1.0, 5.0, 10.0], [1.0, 5.0, 10.0], "rate"), + (AsyncProfile, "constant", [5.0], [5.0], "rate"), + (ConcurrentProfile, "concurrent", [16.0, 4.0, 1.0, 8.0], [1, 4, 8, 16], "streams"), + (ConcurrentProfile, "concurrent", [2.0, 4.0, 8.0], [2, 4, 8], "streams"), + (ConcurrentProfile, "concurrent", [4.0], [4], "streams"), + ], + ids=[ + "async-unsorted", "async-sorted", "async-single", + "concurrent-unsorted", "concurrent-sorted", "concurrent-single", + ], +) +class TestRateSorting: + """Rates and streams should be sorted ascending in resolve_args.""" + + def test_sorting(self, profile_cls, rate_type, unsorted_input, sorted_output, output_key): + resolved = profile_cls.resolve_args( + rate_type=rate_type, + rate=unsorted_input, + random_seed=42, + ) + assert resolved[output_key] == sorted_output + + +# ============================================================================ +# Cross-rate early exit tests (parametrized across AsyncProfile & ConcurrentProfile) +# ============================================================================ + + +class TestMultiRateProfileEarlyExit: + """ + Tests for next_strategy() cross-rate early exit, parametrized across + AsyncProfile and ConcurrentProfile which share the same behavior. + """ + + @staticmethod + def _make_async_profile(): + profile = AsyncProfile(type_="constant", strategy_type="constant", rate=[1.0, 5.0, 10.0]) + first = AsyncConstantStrategy(rate=1.0) + return profile, first + + @staticmethod + def _make_concurrent_profile(): + profile = ConcurrentProfile(streams=[2, 4, 8]) + first = ConcurrentStrategy(streams=2) + return profile, first + + @pytest.mark.parametrize("make_profile", ["_make_async_profile", "_make_concurrent_profile"]) + def test_normal_completion_continues(self, make_profile): + """After normal completion (stop_local), should advance to next rate.""" + profile, first_strategy = getattr(self, make_profile)() + profile.completed_strategies.append(first_strategy) + + next_strat = profile.next_strategy(first_strategy, _make_normal_benchmark()) + + assert next_strat is not None + + @pytest.mark.parametrize("make_profile", ["_make_async_profile", "_make_concurrent_profile"]) + def test_failure_stops(self, make_profile): + """After failure (stop_all), should return None.""" + profile, first_strategy = getattr(self, make_profile)() + profile.completed_strategies.append(first_strategy) + + next_strat = profile.next_strategy( + first_strategy, _make_failure_benchmark() + ) + + assert next_strat is None + + @pytest.mark.parametrize("make_profile", ["_make_async_profile", "_make_concurrent_profile"]) + def test_first_rate_always_runs(self, make_profile): + """First rate should always run (no previous benchmark).""" + profile, _ = getattr(self, make_profile)() + + next_strat = profile.next_strategy(None, None) + + assert next_strat is not None + + @pytest.mark.parametrize("make_profile", ["_make_async_profile", "_make_concurrent_profile"]) + def test_all_rates_completed_returns_none(self, make_profile): + """When all rates are done, should return None regardless.""" + profile, first_strategy = getattr(self, make_profile)() + for _ in range(len(getattr(profile, "rate", None) or profile.streams)): + profile.completed_strategies.append(first_strategy) + + next_strat = profile.next_strategy(first_strategy, _make_mock_benchmark()) + + assert next_strat is None + + def test_middle_rate_failure_skips_remaining(self): + """Rate 1 succeeds, rate 2 succeeds, rate 3 fails, rate 4 is skipped. + + This is the core use case: the system handles low rates fine but + fails at a mid-range rate, and higher rates are not attempted. + """ + profile = AsyncProfile( + type_="constant", + strategy_type="constant", + rate=[1.0, 5.0, 10.0, 50.0], + ) + + # Rate 1 (1.0 RPS): succeeds + strat_1 = profile.next_strategy(None, None) + assert strat_1 is not None + assert strat_1.rate == 1.0 + profile.completed_strategies.append(strat_1) + + # Rate 2 (5.0 RPS): succeeds + strat_2 = profile.next_strategy(strat_1, _make_normal_benchmark()) + assert strat_2 is not None + assert strat_2.rate == 5.0 + profile.completed_strategies.append(strat_2) + + # Rate 3 (10.0 RPS): runs + strat_3 = profile.next_strategy(strat_2, _make_normal_benchmark()) + assert strat_3 is not None + assert strat_3.rate == 10.0 + profile.completed_strategies.append(strat_3) + + # Rate 4 (50.0 RPS): should be skipped because rate 3 failed + strat_4 = profile.next_strategy(strat_3, _make_failure_benchmark()) + assert strat_4 is None + + def test_poisson_strategy_early_exit(self): + """Poisson strategy type should work through the same early-exit path.""" + profile = AsyncProfile( + type_="poisson", + strategy_type="poisson", + rate=[1.0, 5.0, 10.0], + random_seed=42, + ) + + strat_1 = profile.next_strategy(None, None) + assert strat_1 is not None + assert isinstance(strat_1, AsyncPoissonStrategy) + assert strat_1.rate == 1.0 + profile.completed_strategies.append(strat_1) + + # First rate fails -> second rate skipped + strat_2 = profile.next_strategy(strat_1, _make_failure_benchmark()) + assert strat_2 is None + + +# ============================================================================ +# SweepProfile cross-rate early exit tests +# ============================================================================ + + +class TestSweepProfileEarlyExit: + """Tests for SweepProfile.next_strategy() cross-rate early exit.""" + + def _make_profile(self, sweep_size: int = 5) -> SweepProfile: + return SweepProfile(sweep_size=sweep_size, strategy_type="constant") + + def test_sync_and_throughput_always_run(self): + """Synchronous and throughput phases should always run.""" + profile = self._make_profile() + + strat = profile.next_strategy(None, None) + assert strat is not None + assert strat.type_ == "synchronous" + + def test_throughput_runs_after_sync_with_failure(self): + """Throughput always runs after sync (sync returns early before check).""" + profile = self._make_profile() + + sync_strategy = SynchronousStrategy() + sync_benchmark = _make_mock_benchmark( + request_throughput_mean=5.0, + end_queuing_constraints={ + "over_saturation": _make_failure_action("over_saturation"), + }, + ) + + strat = profile.next_strategy(sync_strategy, sync_benchmark) + assert strat is not None + assert strat.type_ == "throughput" + + def test_throughput_failure_does_not_stop(self): + """No failure during throughput should stop the sweep. + + Throughput pushes the system to its limit by design, so all failure + checks are skipped after the throughput phase. + """ + profile = self._make_profile(sweep_size=5) + + sync_strat = SynchronousStrategy() + sync_benchmark = _make_mock_benchmark(request_throughput_mean=2.0) + throughput_strat = profile.next_strategy(sync_strat, sync_benchmark) + profile.completed_strategies.append(sync_strat) + + throughput_benchmark = _make_mock_benchmark( + request_throughput_mean=10.0, + end_queuing_constraints={ + "over_saturation": _make_failure_action("over_saturation"), + }, + ) + profile.completed_strategies.append(throughput_strat) + first_async_strat = profile.next_strategy(throughput_strat, throughput_benchmark) + + assert first_async_strat is not None + + def test_async_phase_stops_on_failure(self): + """During async phase, stop_all constraint should stop remaining rates.""" + profile = self._make_profile(sweep_size=5) + first_async_strat, _ = _advance_sweep_to_async_phase(profile) + assert first_async_strat is not None + + profile.completed_strategies.append(first_async_strat) + next_strat = profile.next_strategy( + first_async_strat, _make_failure_benchmark() + ) + + assert next_strat is None + + def test_async_phase_continues_on_normal_completion(self): + """During async phase, stop_local should advance to next rate.""" + profile = self._make_profile(sweep_size=5) + first_async_strat, _ = _advance_sweep_to_async_phase(profile) + assert first_async_strat is not None + + profile.completed_strategies.append(first_async_strat) + next_strat = profile.next_strategy( + first_async_strat, _make_normal_benchmark() + ) + + assert next_strat is not None + + def test_sweep_size_2_no_async_phase(self): + """With sweep_size=2, only sync + throughput run; no async phase. + + Verifies the escalation check is never reached and the profile + completes cleanly without generating any async rates. + """ + profile = self._make_profile(sweep_size=2) + + # Phase 1: synchronous + sync_strat = profile.next_strategy(None, None) + assert sync_strat is not None + assert sync_strat.type_ == "synchronous" + profile.completed_strategies.append(sync_strat) + + # Phase 2: throughput + sync_benchmark = _make_mock_benchmark(request_throughput_mean=5.0) + throughput_strat = profile.next_strategy(sync_strat, sync_benchmark) + assert throughput_strat is not None + assert throughput_strat.type_ == "throughput" + profile.completed_strategies.append(throughput_strat) + + # Phase 3: should be None — no async rates generated + throughput_benchmark = _make_mock_benchmark(request_throughput_mean=10.0) + next_strat = profile.next_strategy(throughput_strat, throughput_benchmark) + + assert profile.measured_rates == [] + assert next_strat is None From 39b9f51ebf7308c6f4b00a2211906af1d11ed0bc Mon Sep 17 00:00:00 2001 From: Uri Shaket Date: Mon, 23 Feb 2026 16:21:51 +0200 Subject: [PATCH 2/6] add note Signed-off-by: Uri Shaket --- tests/unit/benchmark/test_profiles.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/benchmark/test_profiles.py b/tests/unit/benchmark/test_profiles.py index 9127f5c17..a408217f9 100644 --- a/tests/unit/benchmark/test_profiles.py +++ b/tests/unit/benchmark/test_profiles.py @@ -2,6 +2,8 @@ Tests for cross-rate early exit behavior in AsyncProfile, ConcurrentProfile, and SweepProfile. +## WRITTEN BY AI ## + Validates that: - AsyncProfile and ConcurrentProfile sort rates/streams ascending - Failure constraints (stop_all) stop rate escalation From 4ef5ffbfc6859bd513b46ef1a75d6d395bea4dd4 Mon Sep 17 00:00:00 2001 From: Uri Shaket Date: Mon, 23 Feb 2026 22:26:43 +0200 Subject: [PATCH 3/6] fix pre-commit Signed-off-by: Uri Shaket --- README.md | 3 ++ src/guidellm/benchmark/profiles.py | 2 ++ tests/unit/benchmark/test_profiles.py | 52 ++++++++++++++++++++------- 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index d1c546a3c..8abdede4a 100644 --- a/README.md +++ b/README.md @@ -174,13 +174,16 @@ guidellm benchmark \ **Key parameters:** - `--profile`: Defines the traffic pattern - options include `synchronous` (sequential requests), `concurrent` (parallel users), `throughput` (maximum capacity), `constant` (fixed requests/sec), `poisson` (randomized requests/sec), or `sweep` (automatic rate exploration) + - `--rate`: The numeric rate value whose meaning depends on profile: + - `constant`/`poisson`: requests per second - `concurrent`: number of simultaneous streams - `sweep`: number of benchmarks (only first value used) - `throughput`: max concurrency (only first value used) For `constant`, `poisson`, and `concurrent`, multiple values can be specified (e.g., `--rate 1 --rate 5 --rate 10`). Values are sorted ascending, and if a failure constraint (over-saturation, errors) triggers at a given rate, remaining higher rates are skipped. + - `--max-seconds`: Maximum duration in seconds for each benchmark run (can also use `--max-requests` to limit by request count instead) ### Dataset Sources diff --git a/src/guidellm/benchmark/profiles.py b/src/guidellm/benchmark/profiles.py index 6322924a6..9244e1094 100644 --- a/src/guidellm/benchmark/profiles.py +++ b/src/guidellm/benchmark/profiles.py @@ -427,6 +427,7 @@ def next_strategy( :return: ConcurrentStrategy with next stream count, or None if complete or failure detected """ + _ = prev_strategy if len(self.completed_strategies) >= len(self.streams): return None @@ -602,6 +603,7 @@ def next_strategy( or None if all rates completed or failure detected :raises ValueError: If strategy_type is neither 'constant' nor 'poisson' """ + _ = prev_strategy if len(self.completed_strategies) >= len(self.rate): return None diff --git a/tests/unit/benchmark/test_profiles.py b/tests/unit/benchmark/test_profiles.py index a408217f9..c1489d240 100644 --- a/tests/unit/benchmark/test_profiles.py +++ b/tests/unit/benchmark/test_profiles.py @@ -30,6 +30,10 @@ SynchronousStrategy, ) +MULTI_PROFILE_FACTORIES = ( + "_make_async_profile", + "_make_concurrent_profile", +) # ============================================================================ # Helpers @@ -173,24 +177,42 @@ def test_multiple_stop_local_does_not_trigger(self): @pytest.mark.parametrize( - "profile_cls, rate_type, unsorted_input, sorted_output, output_key", + ("profile_cls", "rate_type", "unsorted_input", "sorted_output", "output_key"), [ - (AsyncProfile, "constant", [50.0, 10.0, 1.0, 25.0], [1.0, 10.0, 25.0, 50.0], "rate"), + ( + AsyncProfile, + "constant", + [50.0, 10.0, 1.0, 25.0], + [1.0, 10.0, 25.0, 50.0], + "rate", + ), (AsyncProfile, "constant", [1.0, 5.0, 10.0], [1.0, 5.0, 10.0], "rate"), (AsyncProfile, "constant", [5.0], [5.0], "rate"), - (ConcurrentProfile, "concurrent", [16.0, 4.0, 1.0, 8.0], [1, 4, 8, 16], "streams"), + ( + ConcurrentProfile, + "concurrent", + [16.0, 4.0, 1.0, 8.0], + [1, 4, 8, 16], + "streams", + ), (ConcurrentProfile, "concurrent", [2.0, 4.0, 8.0], [2, 4, 8], "streams"), (ConcurrentProfile, "concurrent", [4.0], [4], "streams"), ], ids=[ - "async-unsorted", "async-sorted", "async-single", - "concurrent-unsorted", "concurrent-sorted", "concurrent-single", + "async-unsorted", + "async-sorted", + "async-single", + "concurrent-unsorted", + "concurrent-sorted", + "concurrent-single", ], ) class TestRateSorting: """Rates and streams should be sorted ascending in resolve_args.""" - def test_sorting(self, profile_cls, rate_type, unsorted_input, sorted_output, output_key): + def test_sorting( + self, profile_cls, rate_type, unsorted_input, sorted_output, output_key + ): resolved = profile_cls.resolve_args( rate_type=rate_type, rate=unsorted_input, @@ -212,7 +234,11 @@ class TestMultiRateProfileEarlyExit: @staticmethod def _make_async_profile(): - profile = AsyncProfile(type_="constant", strategy_type="constant", rate=[1.0, 5.0, 10.0]) + profile = AsyncProfile( + type_="constant", + strategy_type="constant", + rate=[1.0, 5.0, 10.0], + ) first = AsyncConstantStrategy(rate=1.0) return profile, first @@ -222,7 +248,7 @@ def _make_concurrent_profile(): first = ConcurrentStrategy(streams=2) return profile, first - @pytest.mark.parametrize("make_profile", ["_make_async_profile", "_make_concurrent_profile"]) + @pytest.mark.parametrize("make_profile", MULTI_PROFILE_FACTORIES) def test_normal_completion_continues(self, make_profile): """After normal completion (stop_local), should advance to next rate.""" profile, first_strategy = getattr(self, make_profile)() @@ -232,7 +258,7 @@ def test_normal_completion_continues(self, make_profile): assert next_strat is not None - @pytest.mark.parametrize("make_profile", ["_make_async_profile", "_make_concurrent_profile"]) + @pytest.mark.parametrize("make_profile", MULTI_PROFILE_FACTORIES) def test_failure_stops(self, make_profile): """After failure (stop_all), should return None.""" profile, first_strategy = getattr(self, make_profile)() @@ -244,7 +270,7 @@ def test_failure_stops(self, make_profile): assert next_strat is None - @pytest.mark.parametrize("make_profile", ["_make_async_profile", "_make_concurrent_profile"]) + @pytest.mark.parametrize("make_profile", MULTI_PROFILE_FACTORIES) def test_first_rate_always_runs(self, make_profile): """First rate should always run (no previous benchmark).""" profile, _ = getattr(self, make_profile)() @@ -253,7 +279,7 @@ def test_first_rate_always_runs(self, make_profile): assert next_strat is not None - @pytest.mark.parametrize("make_profile", ["_make_async_profile", "_make_concurrent_profile"]) + @pytest.mark.parametrize("make_profile", MULTI_PROFILE_FACTORIES) def test_all_rates_completed_returns_none(self, make_profile): """When all rates are done, should return None regardless.""" profile, first_strategy = getattr(self, make_profile)() @@ -373,7 +399,9 @@ def test_throughput_failure_does_not_stop(self): }, ) profile.completed_strategies.append(throughput_strat) - first_async_strat = profile.next_strategy(throughput_strat, throughput_benchmark) + first_async_strat = profile.next_strategy( + throughput_strat, throughput_benchmark + ) assert first_async_strat is not None From 2538f49d5184eb827a923d5c2090e23ddb56402c Mon Sep 17 00:00:00 2001 From: Uri Shaket Date: Tue, 24 Feb 2026 15:43:12 +0200 Subject: [PATCH 4/6] add ignore[arg-type] None type is checked earlier in the flow Signed-off-by: Uri Shaket --- src/guidellm/benchmark/profiles.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/guidellm/benchmark/profiles.py b/src/guidellm/benchmark/profiles.py index 9244e1094..c88252fde 100644 --- a/src/guidellm/benchmark/profiles.py +++ b/src/guidellm/benchmark/profiles.py @@ -164,7 +164,7 @@ def strategy_types(self) -> list[str]: return [strat.type_ for strat in self.completed_strategies] @staticmethod - def _should_stop_escalating(prev_benchmark: Benchmark) -> bool: + def _should_stop_escalating(prev_benchmark: Benchmark | None) -> bool: """ Check if a benchmark was terminated by a failure constraint. @@ -174,7 +174,8 @@ def _should_stop_escalating(prev_benchmark: Benchmark) -> bool: Constraints that use "stop_local" (max duration, max requests) are normal completions and do not trigger escalation stops. - :param prev_benchmark: Benchmark instance with a scheduler_state attribute + :param prev_benchmark: Benchmark instance with a scheduler_state + attribute, or None :return: True if a failure constraint was triggered, False otherwise """ scheduler_state = getattr(prev_benchmark, "scheduler_state", None) From 7823e0b17cc0e056399d5ede2b16aafc2a1426a4 Mon Sep 17 00:00:00 2001 From: Uri Shaket Date: Tue, 24 Feb 2026 17:04:54 +0200 Subject: [PATCH 5/6] add ignore[arg-type] None type is checked earlier in the flow Signed-off-by: Uri Shaket --- src/guidellm/benchmark/profiles.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/guidellm/benchmark/profiles.py b/src/guidellm/benchmark/profiles.py index c88252fde..97d9c16bb 100644 --- a/src/guidellm/benchmark/profiles.py +++ b/src/guidellm/benchmark/profiles.py @@ -761,7 +761,7 @@ def next_strategy( # Synchronous never reaches here (returns ThroughputStrategy above). if ( prev_strategy.type_ != "throughput" - and self._should_stop_escalating(prev_benchmark) + and self._should_stop_escalating(prev_benchmark) # type: ignore[arg-type] ): return None From 6ffc31d745cd6ded98a889e1ec81ed255e639c9d Mon Sep 17 00:00:00 2001 From: Uri Shaket Date: Tue, 24 Feb 2026 17:06:18 +0200 Subject: [PATCH 6/6] Revert "add ignore[arg-type] None type is checked earlier in the flow" This reverts commit 2538f49d5184eb827a923d5c2090e23ddb56402c. Signed-off-by: Uri Shaket --- src/guidellm/benchmark/profiles.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/guidellm/benchmark/profiles.py b/src/guidellm/benchmark/profiles.py index 97d9c16bb..33ce042fe 100644 --- a/src/guidellm/benchmark/profiles.py +++ b/src/guidellm/benchmark/profiles.py @@ -164,7 +164,7 @@ def strategy_types(self) -> list[str]: return [strat.type_ for strat in self.completed_strategies] @staticmethod - def _should_stop_escalating(prev_benchmark: Benchmark | None) -> bool: + def _should_stop_escalating(prev_benchmark: Benchmark) -> bool: """ Check if a benchmark was terminated by a failure constraint. @@ -174,8 +174,7 @@ def _should_stop_escalating(prev_benchmark: Benchmark | None) -> bool: Constraints that use "stop_local" (max duration, max requests) are normal completions and do not trigger escalation stops. - :param prev_benchmark: Benchmark instance with a scheduler_state - attribute, or None + :param prev_benchmark: Benchmark instance with a scheduler_state attribute :return: True if a failure constraint was triggered, False otherwise """ scheduler_state = getattr(prev_benchmark, "scheduler_state", None)