Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions docs/advanced.rst
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,38 @@ Custom parameter sources can use the Python standard API but using any additiona

You can also implement your parameter sources and runners in multiple Python files but the main entry point is always ``track.py``. The root package name of your plugin is the name of your track.

.. _adding_tracks_custom_validators:

Validating Track Parameters
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Tracks that rely on parameters supplied via ``--track-params`` (or a parameters file) often need those parameters to satisfy certain constraints. Rather than failing deep inside a parameter source or runner with a confusing error, you can register a validator that Rally invokes for the selected challenge before the benchmark starts. This lets you reject invalid parameters up front with a clear, actionable message.

A validator is any callable that accepts the resolved track parameters and raises ``esrally.exceptions.TrackConfigError`` if they are invalid::

from esrally.exceptions import TrackConfigError


def validate_autoscaling(params):
phases = params.get("autoscaling_phases", [])
scheduling = params.get("scheduling", [])
if len(scheduling) not in (1, len(phases)):
raise TrackConfigError(
f"'scheduling' must contain either a single element or one element per phase "
f"({len(phases)}) but had {len(scheduling)}."
)


def register(registry):
registry.register_validator("autoscaling", validate_autoscaling)

Note the following:

* The first argument to ``register_validator`` is the **challenge name**. Rally only invokes validators registered for the challenge that is selected for the benchmark.
* The validator receives the track parameters as a dictionary (the values supplied via ``--track-params`` or a parameters file).
* Raise ``TrackConfigError`` to abort the benchmark. Rally runs the validators after loading the track and selecting the challenge, but before provisioning nodes or starting the benchmark, so a misconfigured parameter is reported without waiting for the engine to start.
* You may register more than one validator for the same challenge; Rally invokes them in registration order.

.. _adding_tracks_custom_runners:

Creating Your Own Operations With Custom Runners
Expand Down
11 changes: 10 additions & 1 deletion esrally/racecontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
types,
version,
)
from esrally.track import params
from esrally.utils import console, opts, versions

pipelines = collections.OrderedDict()
Expand Down Expand Up @@ -111,7 +112,12 @@ def receiveMsg_Setup(self, msg, sender):
self.cfg = msg.cfg
assert self.cfg is not None
self.coordinator = BenchmarkCoordinator(msg.cfg)
self.coordinator.setup(sources=msg.sources)
try:
self.coordinator.setup(sources=msg.sources)
except exceptions.RallyError as e:
self.logger.info("Setup failed due to a Rally error.", exc_info=e)
self.send(sender, actor.BenchmarkFailure(e.full_message))
return
self.logger.info("Asking mechanic to start the engine.")
self.mechanic = self.createActor(mechanic.MechanicActor, targetActorRequirements={"coordinator": True})
self.send(
Expand Down Expand Up @@ -244,6 +250,9 @@ def setup(self, sources=False):
self.current_track.name, challenge_name, PROGRAM_NAME
)
)
# Validate track parameters for the selected challenge before provisioning the engine so that invalid
# parameters fail fast. Track plugins were loaded (and any validators registered) during load_track above.
params.invoke_validators(self.current_challenge.name, self.cfg.opts("track", "params", mandatory=False, default_value={}))
if self.current_challenge.user_info:
console.info(self.current_challenge.user_info)
for message in self.current_challenge.serverless_info:
Expand Down
20 changes: 17 additions & 3 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,12 @@ def _load_single_track(cfg: types.Config, track_repository, track_name, install_
tpr = TrackProcessorRegistry(cfg)
if install_dependencies:
_install_dependencies(current_track.dependencies)
has_plugins = load_track_plugins(cfg, track_name, register_track_processor=tpr.register_track_processor)
has_plugins = load_track_plugins(
cfg,
track_name,
register_track_processor=tpr.register_track_processor,
register_validator=params.register_validator,
)
current_track.has_plugins = has_plugins
for processor in tpr.processors:
processor.on_after_load_track(current_track)
Expand All @@ -298,6 +303,7 @@ def load_track_plugins(
register_scheduler=None,
register_track_processor=None,
force_update=False,
register_validator=None,
):
"""
Loads plugins that are defined for the current track (as specified by the configuration).
Expand All @@ -309,12 +315,13 @@ def load_track_plugins(
:param register_track_processor: An optional function where track processors can be registered.
:param force_update: If set to ``True`` this ensures that the track is first updated from the remote repository.
Defaults to ``False``.
:param register_validator: An optional function where challenge validators can be registered.
:return: True iff this track defines plugins and they have been loaded.
"""
repo = track_repo(cfg, fetch=force_update, update=force_update)
track_plugin_path = repo.track_dir(track_name)
LOG.debug("Invoking plugin_reader with name [%s] resolved to path [%s]", track_name, track_plugin_path)
plugin_reader = TrackPluginReader(track_plugin_path, register_runner, register_scheduler, register_track_processor)
plugin_reader = TrackPluginReader(track_plugin_path, register_runner, register_scheduler, register_track_processor, register_validator)

if plugin_reader.can_load():
plugin_reader.load()
Expand Down Expand Up @@ -1272,10 +1279,13 @@ class TrackPluginReader:
Loads track plugins
"""

def __init__(self, track_plugin_path, runner_registry=None, scheduler_registry=None, track_processor_registry=None):
def __init__(
self, track_plugin_path, runner_registry=None, scheduler_registry=None, track_processor_registry=None, validator_registry=None
):
self.runner_registry = runner_registry
self.scheduler_registry = scheduler_registry
self.track_processor_registry = track_processor_registry
self.validator_registry = validator_registry
self.loader = modules.ComponentLoader(root_path=track_plugin_path, component_entry_point="track")

def can_load(self):
Expand Down Expand Up @@ -1310,6 +1320,10 @@ def register_track_processor(self, track_processor):
if self.track_processor_registry:
self.track_processor_registry(track_processor)

def register_validator(self, challenge_name, fn):
if self.validator_registry:
self.validator_registry(challenge_name, fn)

@property
def meta_data(self):
return {
Expand Down
32 changes: 32 additions & 0 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

__PARAM_SOURCES_BY_OP: dict[track.OperationType, ParamSource] = {}
__PARAM_SOURCES_BY_NAME: dict[str, ParamSource] = {}
__VALIDATORS_BY_CHALLENGE: dict[str, list[Callable]] = collections.defaultdict(list)


def param_source_for_operation(op_type, track, params, task_name):
Expand Down Expand Up @@ -69,6 +70,37 @@ def register_param_source_for_name(name, param_source_class):
__PARAM_SOURCES_BY_NAME[name] = param_source_class


def register_validator(challenge_name: str, fn: Callable) -> None:
if not callable(fn):
raise exceptions.RallyAssertionError(f"Validator for challenge [{challenge_name}] must be callable but was [{fn}].")
# Registration is idempotent per function: a track's plugins may be (re)loaded more than once within a process
# and we must not invoke the same validator multiple times.
if fn not in __VALIDATORS_BY_CHALLENGE[challenge_name]:
__VALIDATORS_BY_CHALLENGE[challenge_name].append(fn)


def invoke_validators(challenge_name: str, track_params: dict) -> None:
for validator in __VALIDATORS_BY_CHALLENGE.get(challenge_name, []):
try:
validator(track_params or {})
except exceptions.TrackConfigError:
# raised intentionally by the validator to signal invalid parameters; propagate its message as-is
raise
except Exception as e:
validator_name = getattr(validator, "__name__", repr(validator))
raise exceptions.TrackConfigError(f"Validator [{validator_name}] for challenge [{challenge_name}] failed: {e}") from e


# only intended for tests
def _unregister_validators_for_challenge(challenge_name: str) -> None:
__VALIDATORS_BY_CHALLENGE.pop(challenge_name, None)


# only intended for tests
def _clear_validators() -> None:
__VALIDATORS_BY_CHALLENGE.clear()


# only intended for tests
def _unregister_param_source_for_name(name):
# We intentionally do not specify a default value if the key does not exist. If we try to remove a key that we didn't insert then
Expand Down
96 changes: 96 additions & 0 deletions tests/racecontrol_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,25 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=protected-access

import os
from unittest import mock

import pytest

from esrally import config, exceptions, racecontrol
from esrally.track import params, track
from esrally.utils import opts


@pytest.fixture(autouse=True)
def _reset_validators():
# the validator registry is module-global; ensure no registration leaks across tests
yield
params._clear_validators()


@pytest.fixture
def running_in_docker():
os.environ["RALLY_RUNNING_IN_DOCKER"] = "true"
Expand Down Expand Up @@ -109,6 +118,93 @@ def test_runs_a_known_pipeline(unittest_pipeline):
unittest_pipeline.target.assert_called_once_with(cfg)


def _coordinator_cfg(challenge_name, track_params):
cfg = config.Config()
# a pinned distribution version skips the cluster version probe so setup() reaches validation without any I/O
cfg.add(config.Scope.application, "mechanic", "distribution.version", "8.0.0")
cfg.add(config.Scope.application, "track", "challenge.name", challenge_name)
cfg.add(config.Scope.application, "track", "params", track_params)
return cfg


def _track_with_challenge(challenge_name):
challenge = track.Challenge(challenge_name, default=True, schedule=[])
return track.Track(name="unittest", challenges=[challenge])


def test_setup_invokes_track_param_validators_for_selected_challenge():
cfg = _coordinator_cfg("validate-challenge", {"scheduling": [1, 2, 3]})

received = []

def validator(track_params):
received.append(track_params)
raise exceptions.TrackConfigError("'scheduling' must have 1 or 2 elements but had 3.")

params.register_validator("validate-challenge", validator)
with mock.patch("esrally.racecontrol.track.load_track", return_value=_track_with_challenge("validate-challenge")):
coordinator = racecontrol.BenchmarkCoordinator(cfg)
with pytest.raises(exceptions.TrackConfigError, match="'scheduling' must have 1 or 2 elements but had 3."):
coordinator.setup()
# the validator ran fail-fast (before metrics/engine setup) and received the resolved track params
assert received == [{"scheduling": [1, 2, 3]}]


def test_benchmark_actor_reports_rally_error_from_setup_without_traceback():
cfg = _coordinator_cfg("validate-challenge", {"scheduling": [1, 2, 3]})
sender = mock.Mock()

with mock.patch("esrally.actor.log.post_configure_actor_logging"):
benchmark_actor = racecontrol.BenchmarkActor()

with (
mock.patch.object(benchmark_actor, "send") as send,
mock.patch(
"esrally.racecontrol.BenchmarkCoordinator.setup",
side_effect=exceptions.TrackConfigError("invalid track parameters"),
),
):
benchmark_actor.receiveMsg_Setup(racecontrol.Setup(cfg), sender)

send.assert_called_once()
assert send.call_args.args[0] is sender
failure = send.call_args.args[1]
assert isinstance(failure, racecontrol.actor.BenchmarkFailure)
assert failure.message == "invalid track parameters"
assert "Traceback" not in failure.message


@mock.patch("esrally.racecontrol.metrics.race_store")
@mock.patch("esrally.racecontrol.metrics.metrics_store")
@mock.patch("esrally.racecontrol.metrics.create_race")
def test_setup_continues_when_no_validators_registered(create_race, metrics_store, race_store):
cfg = _coordinator_cfg("no-validators-challenge", {"scheduling": [1, 2, 3]})

with mock.patch("esrally.racecontrol.track.load_track", return_value=_track_with_challenge("no-validators-challenge")):
coordinator = racecontrol.BenchmarkCoordinator(cfg)
# no validators are registered for this challenge, so setup() must proceed past validation
coordinator.setup()

create_race.assert_called_once()


@mock.patch("esrally.racecontrol.metrics.race_store")
@mock.patch("esrally.racecontrol.metrics.metrics_store")
@mock.patch("esrally.racecontrol.metrics.create_race")
def test_setup_runs_all_validators_and_continues_when_they_pass(create_race, metrics_store, race_store):
cfg = _coordinator_cfg("multi-validator-challenge", {"scheduling": [1]})

calls = []
params.register_validator("multi-validator-challenge", lambda p: calls.append("first"))
params.register_validator("multi-validator-challenge", lambda p: calls.append("second"))
with mock.patch("esrally.racecontrol.track.load_track", return_value=_track_with_challenge("multi-validator-challenge")):
coordinator = racecontrol.BenchmarkCoordinator(cfg)
coordinator.setup()
# both validators ran (in order) and, because they passed, setup() proceeded past validation
assert calls == ["first", "second"]
create_race.assert_called_once()


def test_multi_cluster_flag_rejected_with_single_host():
"""--multi-cluster with a single host in --target-hosts should be caught by CLI validation."""
# This validation happens in configure_connection_params (rally.py), not racecontrol,
Expand Down
49 changes: 48 additions & 1 deletion tests/track/loader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import pytest

from esrally import config, exceptions, paths
from esrally.track import loader, track
from esrally.track import loader, params, track
from esrally.utils import console, io
from esrally.utils.cases import cases

Expand Down Expand Up @@ -4358,6 +4358,53 @@ def test_rejects_inline_search_operation_document_type(self):
reader("unittest", track_specification, "/mappings")


class TestTrackPluginReader:
def test_register_validator_forwards_to_registry(self):
registry = mock.Mock()
reader = loader.TrackPluginReader("/some/track/path", validator_registry=registry)

def my_validator(params):
pass

reader.register_validator("my-challenge", my_validator)

registry.assert_called_once_with("my-challenge", my_validator)

def test_register_validator_without_registry_is_a_noop(self):
reader = loader.TrackPluginReader("/some/track/path")
# no validator_registry was provided, so forwarding must be skipped without raising
reader.register_validator("my-challenge", lambda params: None)


class TestValidatorRegistrationWiring:
@mock.patch("esrally.track.loader.TrackPluginReader")
@mock.patch("esrally.track.loader.track_repo")
def test_load_track_plugins_passes_validator_registry_to_reader(self, track_repo, plugin_reader_class):
# short-circuit before load() so we only assert how the reader is constructed
plugin_reader_class.return_value.can_load.return_value = False

def my_register_validator(challenge_name, fn):
pass

loader.load_track_plugins(config.Config(), "unittest-track", register_validator=my_register_validator)

# validator_registry is the 5th positional arg of TrackPluginReader.__init__
assert plugin_reader_class.call_args.args[4] is my_register_validator

@mock.patch("esrally.track.loader.load_track_plugins")
@mock.patch("esrally.track.loader.TrackProcessorRegistry")
@mock.patch("esrally.track.loader.TrackFileReader")
def test_load_single_track_registers_params_validators(self, file_reader, track_processor_registry, load_track_plugins):
# pylint: disable=protected-access
load_track_plugins.return_value = False
track_processor_registry.return_value.processors = []

loader._load_single_track(config.Config(), mock.Mock(), "unittest-track")

# the loader must wire the params validator registry so a track's register() can register validators
assert load_track_plugins.call_args.kwargs["register_validator"] is params.register_validator


class MyMockTrackProcessor(loader.TrackProcessor):
pass

Expand Down
Loading
Loading