From a97d7ed6b30171ba1ee496008a773c4968db9776 Mon Sep 17 00:00:00 2001 From: Eugene Rizhkov Date: Mon, 15 Jun 2026 16:10:50 +0300 Subject: [PATCH 1/3] Add track parameter validation hook --- docs/advanced.rst | 32 +++++++++++++++ esrally/racecontrol.py | 4 ++ esrally/track/loader.py | 20 ++++++++-- esrally/track/params.py | 32 +++++++++++++++ tests/racecontrol_test.py | 72 ++++++++++++++++++++++++++++++++++ tests/track/loader_test.py | 49 ++++++++++++++++++++++- tests/track/params_test.py | 79 ++++++++++++++++++++++++++++++++++++++ 7 files changed, 284 insertions(+), 4 deletions(-) diff --git a/docs/advanced.rst b/docs/advanced.rst index 5f0d6bf41..11a39e916 100644 --- a/docs/advanced.rst +++ b/docs/advanced.rst @@ -276,6 +276,38 @@ Custom parameter sources can use the Python standard API but using any additiona You can also implement your parameter sources and runners in multiple Python files but the main entry point is always ``track.py``. The root package name of your plugin is the name of your track. +.. _adding_tracks_custom_validators: + +Validating Track Parameters +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Tracks that rely on parameters supplied via ``--track-params`` (or a parameters file) often need those parameters to satisfy certain constraints. Rather than failing deep inside a parameter source or runner with a confusing error, you can register a validator that Rally invokes for the selected challenge before the benchmark starts. This lets you reject invalid parameters up front with a clear, actionable message. + +A validator is any callable that accepts the resolved track parameters and raises ``esrally.exceptions.TrackConfigError`` if they are invalid:: + + from esrally.exceptions import TrackConfigError + + + def validate_autoscaling(params): + phases = params.get("autoscaling_phases", []) + scheduling = params.get("scheduling", []) + if len(scheduling) not in (1, len(phases)): + raise TrackConfigError( + f"'scheduling' must contain either a single element or one element per phase " + f"({len(phases)}) but had {len(scheduling)}." + ) + + + def register(registry): + registry.register_validator("autoscaling", validate_autoscaling) + +Note the following: + +* The first argument to ``register_validator`` is the **challenge name**. Rally only invokes validators registered for the challenge that is selected for the benchmark. +* The validator receives the track parameters as a dictionary (the values supplied via ``--track-params`` or a parameters file). +* Raise ``TrackConfigError`` to abort the benchmark. Rally runs the validators after loading the track and selecting the challenge, but before provisioning nodes or starting the benchmark, so a misconfigured parameter is reported without waiting for the engine to start. +* You may register more than one validator for the same challenge; Rally invokes them in registration order. + .. _adding_tracks_custom_runners: Creating Your Own Operations With Custom Runners diff --git a/esrally/racecontrol.py b/esrally/racecontrol.py index ef71931e6..065ea52ff 100644 --- a/esrally/racecontrol.py +++ b/esrally/racecontrol.py @@ -39,6 +39,7 @@ types, version, ) +from esrally.track import params from esrally.utils import console, opts, versions pipelines = collections.OrderedDict() @@ -242,6 +243,9 @@ def setup(self, sources=False): self.current_track.name, challenge_name, PROGRAM_NAME ) ) + # Validate track parameters for the selected challenge before provisioning the engine so that invalid + # parameters fail fast. Track plugins were loaded (and any validators registered) during load_track above. + params.invoke_validators(self.current_challenge.name, self.cfg.opts("track", "params", mandatory=False, default_value={})) if self.current_challenge.user_info: console.info(self.current_challenge.user_info) for message in self.current_challenge.serverless_info: diff --git a/esrally/track/loader.py b/esrally/track/loader.py index 5d9ffc049..e537289fe 100644 --- a/esrally/track/loader.py +++ b/esrally/track/loader.py @@ -276,7 +276,12 @@ def _load_single_track(cfg: types.Config, track_repository, track_name, install_ tpr = TrackProcessorRegistry(cfg) if install_dependencies: _install_dependencies(current_track.dependencies) - has_plugins = load_track_plugins(cfg, track_name, register_track_processor=tpr.register_track_processor) + has_plugins = load_track_plugins( + cfg, + track_name, + register_track_processor=tpr.register_track_processor, + register_validator=params.register_validator, + ) current_track.has_plugins = has_plugins for processor in tpr.processors: processor.on_after_load_track(current_track) @@ -298,6 +303,7 @@ def load_track_plugins( register_scheduler=None, register_track_processor=None, force_update=False, + register_validator=None, ): """ Loads plugins that are defined for the current track (as specified by the configuration). @@ -309,12 +315,13 @@ def load_track_plugins( :param register_track_processor: An optional function where track processors can be registered. :param force_update: If set to ``True`` this ensures that the track is first updated from the remote repository. Defaults to ``False``. + :param register_validator: An optional function where challenge validators can be registered. :return: True iff this track defines plugins and they have been loaded. """ repo = track_repo(cfg, fetch=force_update, update=force_update) track_plugin_path = repo.track_dir(track_name) LOG.debug("Invoking plugin_reader with name [%s] resolved to path [%s]", track_name, track_plugin_path) - plugin_reader = TrackPluginReader(track_plugin_path, register_runner, register_scheduler, register_track_processor) + plugin_reader = TrackPluginReader(track_plugin_path, register_runner, register_scheduler, register_track_processor, register_validator) if plugin_reader.can_load(): plugin_reader.load() @@ -1272,10 +1279,13 @@ class TrackPluginReader: Loads track plugins """ - def __init__(self, track_plugin_path, runner_registry=None, scheduler_registry=None, track_processor_registry=None): + def __init__( + self, track_plugin_path, runner_registry=None, scheduler_registry=None, track_processor_registry=None, validator_registry=None + ): self.runner_registry = runner_registry self.scheduler_registry = scheduler_registry self.track_processor_registry = track_processor_registry + self.validator_registry = validator_registry self.loader = modules.ComponentLoader(root_path=track_plugin_path, component_entry_point="track") def can_load(self): @@ -1310,6 +1320,10 @@ def register_track_processor(self, track_processor): if self.track_processor_registry: self.track_processor_registry(track_processor) + def register_validator(self, challenge_name, fn): + if self.validator_registry: + self.validator_registry(challenge_name, fn) + @property def meta_data(self): return { diff --git a/esrally/track/params.py b/esrally/track/params.py index 3f92936aa..e62a169c9 100644 --- a/esrally/track/params.py +++ b/esrally/track/params.py @@ -35,6 +35,7 @@ __PARAM_SOURCES_BY_OP: dict[track.OperationType, ParamSource] = {} __PARAM_SOURCES_BY_NAME: dict[str, ParamSource] = {} +__VALIDATORS_BY_CHALLENGE: dict[str, list[Callable]] = collections.defaultdict(list) def param_source_for_operation(op_type, track, params, task_name): @@ -69,6 +70,37 @@ def register_param_source_for_name(name, param_source_class): __PARAM_SOURCES_BY_NAME[name] = param_source_class +def register_validator(challenge_name: str, fn: Callable) -> None: + if not callable(fn): + raise exceptions.RallyAssertionError(f"Validator for challenge [{challenge_name}] must be callable but was [{fn}].") + # Registration is idempotent per function: a track's plugins may be (re)loaded more than once within a process + # and we must not invoke the same validator multiple times. + if fn not in __VALIDATORS_BY_CHALLENGE[challenge_name]: + __VALIDATORS_BY_CHALLENGE[challenge_name].append(fn) + + +def invoke_validators(challenge_name: str, track_params: dict) -> None: + for validator in __VALIDATORS_BY_CHALLENGE.get(challenge_name, []): + try: + validator(track_params or {}) + except exceptions.TrackConfigError: + # raised intentionally by the validator to signal invalid parameters; propagate its message as-is + raise + except Exception as e: + validator_name = getattr(validator, "__name__", repr(validator)) + raise exceptions.TrackConfigError(f"Validator [{validator_name}] for challenge [{challenge_name}] failed: {e}") from e + + +# only intended for tests +def _unregister_validators_for_challenge(challenge_name: str) -> None: + __VALIDATORS_BY_CHALLENGE.pop(challenge_name, None) + + +# only intended for tests +def _clear_validators() -> None: + __VALIDATORS_BY_CHALLENGE.clear() + + # only intended for tests def _unregister_param_source_for_name(name): # We intentionally do not specify a default value if the key does not exist. If we try to remove a key that we didn't insert then diff --git a/tests/racecontrol_test.py b/tests/racecontrol_test.py index 5b4d399a5..a660b7aed 100644 --- a/tests/racecontrol_test.py +++ b/tests/racecontrol_test.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# pylint: disable=protected-access import os import re @@ -22,6 +23,14 @@ import pytest from esrally import config, exceptions, racecontrol +from esrally.track import params, track + + +@pytest.fixture(autouse=True) +def _reset_validators(): + # the validator registry is module-global; ensure no registration leaks across tests + yield + params._clear_validators() @pytest.fixture @@ -107,3 +116,66 @@ def test_runs_a_known_pipeline(unittest_pipeline): racecontrol.run(cfg) unittest_pipeline.target.assert_called_once_with(cfg) + + +def _coordinator_cfg(challenge_name, track_params): + cfg = config.Config() + # a pinned distribution version skips the cluster version probe so setup() reaches validation without any I/O + cfg.add(config.Scope.application, "mechanic", "distribution.version", "8.0.0") + cfg.add(config.Scope.application, "track", "challenge.name", challenge_name) + cfg.add(config.Scope.application, "track", "params", track_params) + return cfg + + +def _track_with_challenge(challenge_name): + challenge = track.Challenge(challenge_name, default=True, schedule=[]) + return track.Track(name="unittest", challenges=[challenge]) + + +def test_setup_invokes_track_param_validators_for_selected_challenge(): + cfg = _coordinator_cfg("validate-challenge", {"scheduling": [1, 2, 3]}) + + received = [] + + def validator(track_params): + received.append(track_params) + raise exceptions.TrackConfigError("'scheduling' must have 1 or 2 elements but had 3.") + + params.register_validator("validate-challenge", validator) + with mock.patch("esrally.racecontrol.track.load_track", return_value=_track_with_challenge("validate-challenge")): + coordinator = racecontrol.BenchmarkCoordinator(cfg) + with pytest.raises(exceptions.TrackConfigError, match="'scheduling' must have 1 or 2 elements but had 3."): + coordinator.setup() + # the validator ran fail-fast (before metrics/engine setup) and received the resolved track params + assert received == [{"scheduling": [1, 2, 3]}] + + +@mock.patch("esrally.racecontrol.metrics.race_store") +@mock.patch("esrally.racecontrol.metrics.metrics_store") +@mock.patch("esrally.racecontrol.metrics.create_race") +def test_setup_continues_when_no_validators_registered(create_race, metrics_store, race_store): + cfg = _coordinator_cfg("no-validators-challenge", {"scheduling": [1, 2, 3]}) + + with mock.patch("esrally.racecontrol.track.load_track", return_value=_track_with_challenge("no-validators-challenge")): + coordinator = racecontrol.BenchmarkCoordinator(cfg) + # no validators are registered for this challenge, so setup() must proceed past validation + coordinator.setup() + + create_race.assert_called_once() + + +@mock.patch("esrally.racecontrol.metrics.race_store") +@mock.patch("esrally.racecontrol.metrics.metrics_store") +@mock.patch("esrally.racecontrol.metrics.create_race") +def test_setup_runs_all_validators_and_continues_when_they_pass(create_race, metrics_store, race_store): + cfg = _coordinator_cfg("multi-validator-challenge", {"scheduling": [1]}) + + calls = [] + params.register_validator("multi-validator-challenge", lambda p: calls.append("first")) + params.register_validator("multi-validator-challenge", lambda p: calls.append("second")) + with mock.patch("esrally.racecontrol.track.load_track", return_value=_track_with_challenge("multi-validator-challenge")): + coordinator = racecontrol.BenchmarkCoordinator(cfg) + coordinator.setup() + # both validators ran (in order) and, because they passed, setup() proceeded past validation + assert calls == ["first", "second"] + create_race.assert_called_once() diff --git a/tests/track/loader_test.py b/tests/track/loader_test.py index 09adb8f88..ca133bf3f 100644 --- a/tests/track/loader_test.py +++ b/tests/track/loader_test.py @@ -29,7 +29,7 @@ import pytest from esrally import config, exceptions, paths -from esrally.track import loader, track +from esrally.track import loader, params, track from esrally.utils import console, io from esrally.utils.cases import cases @@ -4358,6 +4358,53 @@ def test_rejects_inline_search_operation_document_type(self): reader("unittest", track_specification, "/mappings") +class TestTrackPluginReader: + def test_register_validator_forwards_to_registry(self): + registry = mock.Mock() + reader = loader.TrackPluginReader("/some/track/path", validator_registry=registry) + + def my_validator(params): + pass + + reader.register_validator("my-challenge", my_validator) + + registry.assert_called_once_with("my-challenge", my_validator) + + def test_register_validator_without_registry_is_a_noop(self): + reader = loader.TrackPluginReader("/some/track/path") + # no validator_registry was provided, so forwarding must be skipped without raising + reader.register_validator("my-challenge", lambda params: None) + + +class TestValidatorRegistrationWiring: + @mock.patch("esrally.track.loader.TrackPluginReader") + @mock.patch("esrally.track.loader.track_repo") + def test_load_track_plugins_passes_validator_registry_to_reader(self, track_repo, plugin_reader_class): + # short-circuit before load() so we only assert how the reader is constructed + plugin_reader_class.return_value.can_load.return_value = False + + def my_register_validator(challenge_name, fn): + pass + + loader.load_track_plugins(config.Config(), "unittest-track", register_validator=my_register_validator) + + # validator_registry is the 5th positional arg of TrackPluginReader.__init__ + assert plugin_reader_class.call_args.args[4] is my_register_validator + + @mock.patch("esrally.track.loader.load_track_plugins") + @mock.patch("esrally.track.loader.TrackProcessorRegistry") + @mock.patch("esrally.track.loader.TrackFileReader") + def test_load_single_track_registers_params_validators(self, file_reader, track_processor_registry, load_track_plugins): + # pylint: disable=protected-access + load_track_plugins.return_value = False + track_processor_registry.return_value.processors = [] + + loader._load_single_track(config.Config(), mock.Mock(), "unittest-track") + + # the loader must wire the params validator registry so a track's register() can register validators + assert load_track_plugins.call_args.kwargs["register_validator"] is params.register_validator + + class MyMockTrackProcessor(loader.TrackProcessor): pass diff --git a/tests/track/params_test.py b/tests/track/params_test.py index 69cd6771d..45494b961 100644 --- a/tests/track/params_test.py +++ b/tests/track/params_test.py @@ -3358,3 +3358,82 @@ def test_downsample_empty_params(self): assert p["fixed-interval"] == "1h" assert p["target-index"] == f"{p['source-index']}-{p['fixed-interval']}" assert p.get("sampling-method") is None + + +class TestValidatorRegistry: + def teardown_method(self): + params._clear_validators() + + def test_registered_validator_is_invoked(self): + received = [] + + def my_validator(p): + received.append(p) + + params.register_validator("test-challenge", my_validator) + params.invoke_validators("test-challenge", {"key": "value"}) + + assert received == [{"key": "value"}] + + def test_multiple_validators_all_invoked(self): + calls = [] + + params.register_validator("test-challenge", lambda p: calls.append("first")) + params.register_validator("test-challenge", lambda p: calls.append("second")) + params.invoke_validators("test-challenge", {}) + + assert calls == ["first", "second"] + + def test_invoke_with_no_validators_is_a_noop(self): + params.invoke_validators("test-challenge", {"key": "value"}) + + def test_invoke_with_none_params_does_not_crash(self): + received = [] + params.register_validator("test-challenge", received.append) + # a None params payload is normalized to an empty dict before reaching the validator + params.invoke_validators("test-challenge", None) # type: ignore[arg-type] + assert received == [{}] + + def test_registering_the_same_validator_twice_invokes_it_once(self): + calls = [] + + def my_validator(p): + calls.append(p) + + # simulates a track's plugins being (re)loaded more than once in the same process + params.register_validator("test-challenge", my_validator) + params.register_validator("test-challenge", my_validator) + params.invoke_validators("test-challenge", {}) + + assert calls == [{}] + + def test_validator_raising_track_config_error_propagates(self): + def bad_validator(p): + raise exceptions.TrackConfigError("invalid param") + + params.register_validator("test-challenge", bad_validator) + + with pytest.raises(exceptions.TrackConfigError, match="invalid param"): + params.invoke_validators("test-challenge", {}) + + def test_validators_not_invoked_for_different_challenge(self): + called = [] + params.register_validator("test-challenge", lambda p: called.append(True)) + params.invoke_validators("other-challenge", {}) + assert called == [] + + def test_cannot_register_a_non_callable_validator(self): + with pytest.raises(exceptions.RallyAssertionError, match="Validator for challenge \\[test-challenge\\] must be callable"): + # intentionally pass a non-callable to exercise the runtime guard + params.register_validator("test-challenge", "not-callable") # type: ignore[arg-type] + + def test_unexpected_validator_error_is_wrapped_with_context(self): + def buggy_validator(p): + return p["missing-key"] + + params.register_validator("test-challenge", buggy_validator) + + with pytest.raises( + exceptions.TrackConfigError, match="Validator \\[buggy_validator\\] for challenge \\[test-challenge\\] failed: 'missing-key'" + ): + params.invoke_validators("test-challenge", {}) From e086ed26677bdd162f4791c26efa992dc976d7e2 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Thu, 25 Jun 2026 17:43:35 +0400 Subject: [PATCH 2/3] Prevent traceback on parameter validation error --- esrally/racecontrol.py | 7 ++++++- tests/racecontrol_test.py | 22 ++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/esrally/racecontrol.py b/esrally/racecontrol.py index 3d641b79a..6e20d326f 100644 --- a/esrally/racecontrol.py +++ b/esrally/racecontrol.py @@ -112,7 +112,12 @@ def receiveMsg_Setup(self, msg, sender): self.cfg = msg.cfg assert self.cfg is not None self.coordinator = BenchmarkCoordinator(msg.cfg) - self.coordinator.setup(sources=msg.sources) + try: + self.coordinator.setup(sources=msg.sources) + except exceptions.RallyError as e: + self.logger.info("Setup failed due to a Rally error.", exc_info=e) + self.send(sender, actor.BenchmarkFailure(e.full_message)) + return self.logger.info("Asking mechanic to start the engine.") self.mechanic = self.createActor(mechanic.MechanicActor, targetActorRequirements={"coordinator": True}) self.send( diff --git a/tests/racecontrol_test.py b/tests/racecontrol_test.py index a9f4ad496..012606489 100644 --- a/tests/racecontrol_test.py +++ b/tests/racecontrol_test.py @@ -150,6 +150,28 @@ def validator(track_params): assert received == [{"scheduling": [1, 2, 3]}] +def test_benchmark_actor_reports_rally_error_from_setup_without_traceback(): + cfg = _coordinator_cfg("validate-challenge", {"scheduling": [1, 2, 3]}) + benchmark_actor = racecontrol.BenchmarkActor() + sender = mock.Mock() + + with ( + mock.patch.object(benchmark_actor, "send") as send, + mock.patch( + "esrally.racecontrol.BenchmarkCoordinator.setup", + side_effect=exceptions.TrackConfigError("invalid track parameters"), + ), + ): + benchmark_actor.receiveMsg_Setup(racecontrol.Setup(cfg), sender) + + send.assert_called_once() + assert send.call_args.args[0] is sender + failure = send.call_args.args[1] + assert isinstance(failure, racecontrol.actor.BenchmarkFailure) + assert failure.message == "invalid track parameters" + assert "Traceback" not in failure.message + + @mock.patch("esrally.racecontrol.metrics.race_store") @mock.patch("esrally.racecontrol.metrics.metrics_store") @mock.patch("esrally.racecontrol.metrics.create_race") From f54908256e6b5acc6c4a61389f528452c07cacdf Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Fri, 26 Jun 2026 15:06:46 +0400 Subject: [PATCH 3/3] Fix unit test in CI --- tests/racecontrol_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/racecontrol_test.py b/tests/racecontrol_test.py index 012606489..445dc0709 100644 --- a/tests/racecontrol_test.py +++ b/tests/racecontrol_test.py @@ -152,9 +152,11 @@ def validator(track_params): def test_benchmark_actor_reports_rally_error_from_setup_without_traceback(): cfg = _coordinator_cfg("validate-challenge", {"scheduling": [1, 2, 3]}) - benchmark_actor = racecontrol.BenchmarkActor() sender = mock.Mock() + with mock.patch("esrally.actor.log.post_configure_actor_logging"): + benchmark_actor = racecontrol.BenchmarkActor() + with ( mock.patch.object(benchmark_actor, "send") as send, mock.patch(