Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,16 @@ guidellm benchmark \
**Key parameters:**

- `--profile`: Defines the traffic pattern - options include `synchronous` (sequential requests), `concurrent` (parallel users), `throughput` (maximum capacity), `constant` (fixed requests/sec), `poisson` (randomized requests/sec), or `sweep` (automatic rate exploration)
- `--rate`: The numeric rate value whose meaning depends on profile - for `sweep` it's the number of benchmarks, for `concurrent` it's simultaneous requests, for `constant`/`poisson` it's requests per second

- `--rate`: The numeric rate value whose meaning depends on profile:

- `constant`/`poisson`: requests per second
- `concurrent`: number of simultaneous streams
- `sweep`: number of benchmarks (only first value used)
- `throughput`: max concurrency (only first value used)

For `constant`, `poisson`, and `concurrent`, multiple values can be specified (e.g., `--rate 1 --rate 5 --rate 10`). Values are sorted ascending, and if a failure constraint (over-saturation, errors) triggers at a given rate, remaining higher rates are skipped.

- `--max-seconds`: Maximum duration in seconds for each benchmark run (can also use `--max-requests` to limit by request count instead)

### Dataset Sources
Expand Down
10 changes: 8 additions & 2 deletions src/guidellm/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,14 @@ def benchmark():
default=BenchmarkGenerativeTextArgs.get_default("rate"),
help=(
"Benchmark rate(s) to test. Meaning depends on profile: "
"sweep=number of benchmarks, concurrent=concurrent requests, "
"async/constant/poisson=requests per second."
"constant/poisson=requests per second, "
"concurrent=number of parallel streams, "
"sweep=number of benchmarks (only first value used), "
"throughput=max concurrency (only first value used). "
"For constant, poisson, and concurrent profiles, multiple values "
"can be specified (e.g., --rate 1 --rate 5 --rate 10), are sorted "
"ascending, and if a failure constraint (over-saturation, errors) "
"triggers at a given rate, higher rates are skipped."
),
)
# Backend configuration
Expand Down
95 changes: 83 additions & 12 deletions src/guidellm/benchmark/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)

from guidellm import settings
from guidellm.logger import logger
from guidellm.scheduler import (
AsyncConstantStrategy,
AsyncPoissonStrategy,
Expand Down Expand Up @@ -162,6 +163,33 @@ def strategy_types(self) -> list[str]:
"""
return [strat.type_ for strat in self.completed_strategies]

@staticmethod
def _should_stop_escalating(prev_benchmark: Benchmark) -> bool:
"""
Check if a benchmark was terminated by a failure constraint.

Inspects the scheduler state's end_queuing_constraints for any constraint
that used "stop_all" for request processing, which indicates the system
could not handle the load (over-saturation, excessive errors, etc.).
Constraints that use "stop_local" (max duration, max requests) are normal
completions and do not trigger escalation stops.

:param prev_benchmark: Benchmark instance with a scheduler_state attribute
:return: True if a failure constraint was triggered, False otherwise
"""
scheduler_state = getattr(prev_benchmark, "scheduler_state", None)
if scheduler_state is None:
return False

for name, action in scheduler_state.end_queuing_constraints.items():
if action.request_processing == "stop_all":
logger.info(
f"Stopping rate escalation: constraint '{name}' "
f"triggered (request_processing=stop_all)"
)
return True
return False

def strategies_generator(
self,
) -> Generator[
Expand Down Expand Up @@ -362,7 +390,17 @@ def resolve_args(
"""
_ = (rate_type, random_seed) # unused
rate = rate if isinstance(rate, list) or rate is None else [rate]
kwargs["streams"] = [int(stream) for stream in rate] if rate else None
if rate:
streams = [int(stream) for stream in rate]
sorted_streams = sorted(streams)
if sorted_streams != streams:
logger.warning(
f"Streams reordered from {streams} to "
f"{sorted_streams} (ascending)"
)
kwargs["streams"] = sorted_streams
else:
kwargs["streams"] = None
return kwargs

@property
Expand All @@ -380,15 +418,22 @@ def next_strategy(
"""
Generate concurrent strategy for next stream count.

:param prev_strategy: Previously completed strategy (unused)
:param prev_benchmark: Benchmark results from previous execution (unused)
Stream counts are sorted ascending, so if a previous stream count was
terminated by a failure constraint (over-saturation, errors, etc.), all
remaining higher stream counts are skipped.

:param prev_strategy: Previously completed strategy
:param prev_benchmark: Benchmark results from previous execution
:return: ConcurrentStrategy with next stream count, or None if complete
or failure detected
"""
_ = (prev_strategy, prev_benchmark) # unused

_ = prev_strategy
if len(self.completed_strategies) >= len(self.streams):
return None

if prev_benchmark is not None and self._should_stop_escalating(prev_benchmark):
return None

return ConcurrentStrategy(
streams=self.streams[len(self.completed_strategies)],
rampup_duration=self.rampup_duration,
Expand Down Expand Up @@ -522,7 +567,13 @@ def resolve_args(
if rate_type in ["constant", "poisson"]
else kwargs.get("strategy_type", "constant")
)
kwargs["rate"] = rate if isinstance(rate, list) else [rate]
rate_list = rate if isinstance(rate, list) else [rate]
sorted_rates = sorted(rate_list)
if sorted_rates != rate_list:
logger.warning(
f"Rates reordered from {rate_list} to {sorted_rates} (ascending)"
)
kwargs["rate"] = sorted_rates
kwargs["random_seed"] = random_seed
return kwargs

Expand All @@ -542,17 +593,23 @@ def next_strategy(
"""
Generate async strategy for next configured rate.

:param prev_strategy: Previously completed strategy (unused)
:param prev_benchmark: Benchmark results from previous execution (unused)
Rates are sorted ascending, so if a previous rate was terminated by a
failure constraint (over-saturation, errors, etc.), all remaining higher
rates are skipped.

:param prev_strategy: Previously completed strategy
:param prev_benchmark: Benchmark results from previous execution
:return: AsyncConstantStrategy or AsyncPoissonStrategy for next rate,
or None if all rates completed
or None if all rates completed or failure detected
:raises ValueError: If strategy_type is neither 'constant' nor 'poisson'
"""
_ = (prev_strategy, prev_benchmark) # unused

_ = prev_strategy
if len(self.completed_strategies) >= len(self.rate):
return None

if prev_benchmark is not None and self._should_stop_escalating(prev_benchmark):
return None

current_rate = self.rate[len(self.completed_strategies)]

if self.strategy_type == "constant":
Expand Down Expand Up @@ -660,7 +717,9 @@ def next_strategy(
Generate next strategy in adaptive sweep sequence.

Executes synchronous and throughput strategies first to measure baseline
rates, then generates interpolated rates for async strategies.
rates, then generates interpolated rates for async strategies. If a
failure constraint is triggered during the async phase, all remaining
higher rates are skipped.

:param prev_strategy: Previously completed strategy instance
:param prev_benchmark: Benchmark results from previous strategy execution
Expand Down Expand Up @@ -692,6 +751,18 @@ def next_strategy(
self.sweep_size - 1,
)
)[1:] # don't rerun synchronous
# After throughput, fall through to async rate logic below.
# Don't check escalation since throughput is designed to push
# beyond sustainable load (over-saturation is expected).

# Stop escalation if a failure constraint was triggered.
# The throughput guard above skips this via the != "throughput" check.
# Synchronous never reaches here (returns ThroughputStrategy above).
if (
prev_strategy.type_ != "throughput"
and self._should_stop_escalating(prev_benchmark) # type: ignore[arg-type]
):
return None

next_index = (
len(self.completed_strategies) - 1 - 1
Expand Down
Loading