diff --git a/CHANGES.next.md b/CHANGES.next.md index a5176a65a4..86e90ca732 100644 --- a/CHANGES.next.md +++ b/CHANGES.next.md @@ -1,3 +1,10 @@ +### New features: +- Add kubernetes_management benchmark for measuring GKE/EKS/AKS management + plane API responsiveness. (from @ashishsuneja) +- Add KubernetesCluster base class management plane abstract methods: + CreateNodePool, DeleteNodePool, UpgradeNodePool, UpdateCluster and + their async counterparts. (from @ashishsuneja) + ### Breaking changes: - Added --accept_licenses flag. User have to turn this flag on to acknowledge diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_management_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_management_benchmark.py index 9f44b52c78..579dd8f068 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_management_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_management_benchmark.py @@ -13,16 +13,43 @@ # limitations under the License. """Benchmark for Kubernetes management plane operations. -TODO: Add comments & implement. +Measures GKE/EKS/AKS control-plane API responsiveness via three scenarios: + A. Concurrent node-pool create/upgrade/delete. + B. Node-pool create overlapping with a long-running cluster update. + C. Large-scale node-pool provisioning (single scale or sweep). + +Optimizations for minimum run time: + - Streaming concurrency in Scenario C (no batch barriers) + - Optional pipelined Scenario A (create->upgrade->delete per thread) + - Reduced poll_interval in provider WaitForOperation (5s vs 10s) + - Per-op threads capped at _MAX_CONCURRENT to avoid OS limits + - Accurate delete success rate via attempted_ops denominator """ -from typing import Any +import copy +import dataclasses +import statistics +import threading +import time +from typing import Callable +from absl import flags +from absl import logging +from perfkitbenchmarker import background_tasks from perfkitbenchmarker import benchmark_spec as bm_spec from perfkitbenchmarker import configs +from perfkitbenchmarker import errors from perfkitbenchmarker import sample +from perfkitbenchmarker.configs import benchmark_config_spec +from perfkitbenchmarker.resources.container_service import ( + container as container_lib, +) +from perfkitbenchmarker.resources.container_service import kubectl +from perfkitbenchmarker.resources.container_service import kubernetes_cluster + +_SLEEP_POD_NAME = "pkb-mgmt-sleep" -BENCHMARK_NAME = 'kubernetes_management' +BENCHMARK_NAME = "kubernetes_management" BENCHMARK_CONFIG = """ kubernetes_management: @@ -30,29 +57,764 @@ Benchmarks GKE/EKS/AKS management plane operations: concurrent node pool create/upgrade/delete, overlapping cluster + node-pool ops, and large-scale provisioning. Focused on control-plane API responsiveness. + Spec regions: GCP us-central1, AWS us-east-1 (closest), Azure eastus. + Equivalent machine types across clouds per Google benchmark spec. container_cluster: type: Kubernetes vm_count: 1 + vm_spec: + GCP: + # us-central1-a: spec primary region for GCP + # e2-standard-2: 2 vCPU 8GB — equivalent to t3.medium / D2s_v3 + machine_type: e2-standard-2 + zone: us-central1-a + AWS: + # us-east-1a: closest comparable region to GCP us-central1 + # t3.medium: 2 vCPU 4GB — closest equivalent to e2-standard-2 + machine_type: t3.medium + zone: us-east-1a + Azure: + # eastus: closest comparable region to GCP us-central1 + # Standard_D2s_v3: 2 vCPU 8GB — equivalent to e2-standard-2 + machine_type: Standard_D2s_v3 + zone: eastus """ +_VALID_SCENARIOS = frozenset({"A", "B", "C"}) + +_CONCURRENT_NODEPOOLS = flags.DEFINE_integer( + "k8s_mgmt_concurrent_nodepools", + 5, + "Number of node pools to create/upgrade/delete concurrently in Scenario A.", +) +_LARGE_SCALE_NODEPOOLS = flags.DEFINE_integer( + "k8s_mgmt_large_scale_nodepools", + 1000, + "Number of node pools to provision in the large-scale Scenario C. " + + "Spec target is 1000; ensure VPC/quota is available before running.", +) +_NODES_PER_NODEPOOL = flags.DEFINE_integer( + "k8s_mgmt_nodes_per_nodepool", + 2, + "Number of nodes per node pool. Google spec: 2 nodes per pool.", +) +_INITIAL_VERSION = flags.DEFINE_string( + "k8s_mgmt_initial_version", + None, + "Kubernetes version for newly-created node pools (N-1). None = auto.", +) +_TARGET_VERSION = flags.DEFINE_string( + "k8s_mgmt_target_version", + None, + "Kubernetes version to upgrade node pools to (N). None = cluster version.", +) +_SCENARIOS = flags.DEFINE_list( + "k8s_mgmt_scenarios", + ["A", "B", "C"], + "Comma-separated subset of scenarios to run. Valid values: A, B, C.", +) +_SCALE_SWEEP = flags.DEFINE_list( + "k8s_mgmt_scale_sweep", + [], + "Comma-separated list of node-pool counts for Scenario C scale sweep. " + + "Each scale runs as a separate sub-run with full create/delete cycle. " + + "Example: --k8s_mgmt_scale_sweep=10,50,100,500,1000. " + + "If empty, uses --k8s_mgmt_large_scale_nodepools.", +) +_MAX_CONCURRENT = flags.DEFINE_integer( + "k8s_mgmt_max_concurrent", + 50, + "Cap on concurrent provider API calls within a batch. " + + "Higher = faster but more aggressive on connection pools.", +) +_PIPELINE_SCENARIO_A = flags.DEFINE_boolean( + "k8s_mgmt_pipeline_scenario_a", + True, + "If True, run Scenario A as per-pool pipeline (create->upgrade->delete " + + "back-to-back per thread). Minimizes wall time. " + + "Default False for spec-strict phase-by-phase.", +) + +# AKS caps node-pool names at 12 chars — keep all names within that limit. +_PREFIX = "pkbm" + + +def _ScenarioAName(i): + return f"{_PREFIX}a{i:03d}" + + +_SCENARIO_B_NAME = f"{_PREFIX}b" + + +def _ScenarioCName(i): + return f"{_PREFIX}c{i:04d}" + -def GetConfig(user_config: dict[str, Any]) -> dict[str, Any]: - """Returns the configuration of a benchmark.""" +@dataclasses.dataclass +class _OpResult: + """Holds timing and outcome for a single async management-plane operation.""" + + name: str + init_dur: float + e2e_dur: float + error: Exception | None = None + + def __iter__(self): + yield self.name + yield self.init_dur + yield self.e2e_dur + yield self.error + + +def GetConfig(user_config): return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) -def CheckPrerequisites(benchmark_config: bm_spec.BenchmarkSpec) -> None: - del benchmark_config +def CheckPrerequisites( + benchmark_config: benchmark_config_spec.BenchmarkConfigSpec, +): + """Validates flag values and cluster type before any cloud calls.""" + invalid = [s for s in _SCENARIOS.value if s.strip() not in _VALID_SCENARIOS] + if invalid: + raise errors.Config.InvalidValue( + f"Invalid value(s) for --k8s_mgmt_scenarios: {invalid}. " + + f"Valid options: {sorted(_VALID_SCENARIOS)}." + ) + for s in _SCALE_SWEEP.value: + try: + int(s.strip()) + except ValueError as e: + raise errors.Config.InvalidValue( + f"Non-integer value in --k8s_mgmt_scale_sweep: {s!r}" + ) from e + if benchmark_config.container_cluster.type != "Kubernetes": + raise errors.Config.InvalidValue( + "kubernetes_management benchmark requires a Kubernetes" + + " container cluster." + ) def Prepare(benchmark_spec: bm_spec.BenchmarkSpec) -> None: - del benchmark_spec + """Asserts the cluster is reachable; deploys spec-defined sleep workload.""" + cluster = benchmark_spec.container_cluster + assert isinstance(cluster, kubernetes_cluster.KubernetesCluster) + benchmark_spec.always_call_cleanup = True + logging.info( + "kubernetes_management Prepare: cluster=%s, version=%s", + cluster.name, + cluster.k8s_version, + ) + # Spec workload: "a simple container that sleeps for a given time". + # Confirms data-plane reachability; generates no data-plane load. + kubectl.RunKubectlCommand( + [ + "run", + _SLEEP_POD_NAME, + "--image=busybox", + "--restart=Never", + "--", + "sleep", + "86400", + ], + ) + + +def _CleanStartSweep(cluster: kubernetes_cluster.KubernetesCluster) -> None: + """Deletes any stale pkbm* node pools so each run starts clean (spec C.2).""" + stale = [n for n in cluster.GetNodePoolNames() if n.startswith(_PREFIX)] + if not stale: + logging.info("CleanStart: no stale pools found — clean start confirmed.") + return + logging.info("CleanStart: deleting %d stale pools: %s", len(stale), stale) + background_tasks.RunThreaded(cluster.DeleteNodePool, stale) def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> list[sample.Sample]: - del benchmark_spec - return [] + """Runs the selected scenarios and returns flat list of samples.""" + cluster = benchmark_spec.container_cluster + assert isinstance(cluster, kubernetes_cluster.KubernetesCluster) + + # Spec C.2: start clean. + _CleanStartSweep(cluster) + + # Resolve versions once; log clearly; tag every sample. + # Google spec: initial=N-1, target=N (adjacent minor upgrade). + flag_initial = _INITIAL_VERSION.value + flag_target = _TARGET_VERSION.value + if not (flag_initial and flag_target): + resolved_initial, resolved_target = cluster.ResolveNodePoolVersions() + flag_initial = flag_initial or resolved_initial + flag_target = flag_target or resolved_target + initial, target = flag_initial, flag_target + if _INITIAL_VERSION.value and _TARGET_VERSION.value: + source = "flags" + elif not (_INITIAL_VERSION.value or _TARGET_VERSION.value): + source = "auto-resolved" + else: + source = "mixed" + + logging.info( + "NodePool versions (%s): initial=%s -> target=%s " + + "(cluster k8s_version=%s) | nodes_per_pool=%d | machine_type=%s", + source, + initial, + target, + cluster.k8s_version, + _NODES_PER_NODEPOOL.value, + cluster.default_nodepool.machine_type + if hasattr(cluster, "default_nodepool") + else "unknown", + ) + + scenarios = {s.strip().upper() for s in _SCENARIOS.value} + samples: list[sample.Sample] = [] + + if "A" in scenarios: + samples += _RunScenarioA(cluster, initial, target) + if "B" in scenarios: + samples += _RunScenarioB(cluster, initial) + if "C" in scenarios: + # fix: Scenario A/B pools may still be in Deleting state and count + # toward AKS's 100-pool cluster limit. Sweep them out before Scenario C + # so we don't hit MaxAgentPoolCountReached mid-run. + _CleanStartSweep(cluster) + scales = ( + [int(x.strip()) for x in _SCALE_SWEEP.value] + if _SCALE_SWEEP.value + else [_LARGE_SCALE_NODEPOOLS.value] + ) + logging.info("Scenario C: scale sweep = %s", scales) + for scale in scales: + scenario_c_samples = _RunScenarioC(cluster, initial, scale) + for s in scenario_c_samples: + s.metadata["scenario_c_scale"] = str(scale) + samples += scenario_c_samples + + # Tag all samples with version path and run config for published results. + run_meta = { + "initial_version": str(initial), + "target_version": str(target), + "cluster_k8s_version": str(cluster.k8s_version), + "nodes_per_nodepool": str(_NODES_PER_NODEPOOL.value), + "concurrent_nodepools": str(_CONCURRENT_NODEPOOLS.value), + } + for s in samples: + s.metadata.update(run_meta) + + return samples def Cleanup(benchmark_spec: bm_spec.BenchmarkSpec) -> None: - del benchmark_spec + """Best-effort delete of leftover benchmark node pools and sleep pod.""" + cluster = benchmark_spec.container_cluster + if cluster is None: + return + kubectl.RunKubectlCommand( + ["delete", "pod", _SLEEP_POD_NAME, "--ignore-not-found"], + raise_on_failure=False, + ) + leftover = [n for n in cluster.GetNodePoolNames() if n.startswith(_PREFIX)] + if not leftover: + return + logging.info("Cleanup: deleting %d leftover node pools", len(leftover)) + background_tasks.RunThreaded(cluster.DeleteNodePool, leftover) + + +# --------------------------------------------------------------------------- +# Scenario A +# --------------------------------------------------------------------------- + + +def _RunScenarioA( + cluster: kubernetes_cluster.KubernetesCluster, + initial: str, + target: str, +) -> list[sample.Sample]: + """Concurrent CreateNodePool, UpgradeNodePool, DeleteNodePool.""" + n = _CONCURRENT_NODEPOOLS.value + if _PIPELINE_SCENARIO_A.value: + logging.info( + "Scenario A (pipelined): %d pools, initial=%s, target=%s", + n, + initial, + target, + ) + return _RunScenarioAPipelined(cluster, n, initial, target) + + logging.info( + "Scenario A (phase-by-phase): %d pools, initial=%s, target=%s", + n, + initial, + target, + ) + pool_names = [_ScenarioAName(i) for i in range(n)] + configs_ = [_MakeNodePoolConfig(cluster, name) for name in pool_names] + samples: list[sample.Sample] = [] + + # ── Phase 1: concurrent creates ───────────────────────────────────────── + create_results = _RunAsync( + kickoff=lambda cfg: cluster.CreateNodePoolAsync( + cfg, node_version=initial + ), + wait_fn=cluster.WaitForOperation, + items=configs_, + get_name=lambda cfg: cfg.name, + ) + samples += _OpSamples( + "ScenarioA_Create", create_results, attempted_ops=len(pool_names) + ) + + # ── Phase 2: concurrent upgrades (only successfully created pools) ─────── + created = [r.name for r in create_results if r.error is None] + logging.info( + "Scenario A: %d/%d pools created — proceeding to upgrade", len(created), n + ) + upgrade_results = _RunAsync( + kickoff=lambda name: cluster.UpgradeNodePoolAsync(name, target), + wait_fn=cluster.WaitForOperation, + items=created, + get_name=str, + ) + samples += _OpSamples( + "ScenarioA_Upgrade", upgrade_results, attempted_ops=len(created) + ) + + # # ── Idiomatic Control Plane Synchronization Barrier ────────────────────── + # # Give the GKE control plane a brief window to register the async ops. + # time.sleep(15) + + # # Check if the cluster object has our native upgrade tracking capability. + # if hasattr(cluster, 'HasActiveUpgradeOperations'): + # logging.info('GCP GKE cluster detected; polling via provider API.') + + # while cluster.HasActiveUpgradeOperations(): + # logging.info( + # 'Upgrade operations active; holding delete phase for 30s.') + # time.sleep(30) + + # logging.info( + # 'All upgrade ops completed; flushing API gateway write-locks.') + # time.sleep(10) + # else: + # # Non-GCP providers (Azure AKS / AWS EKS): standard safety pause. + # logging.info( + # 'Non-GCP cluster; proceeding with stabilization pause.') + # time.sleep(5) + + # ── Phase 3: concurrent deletes (live-list to catch EKS rollbacks) ────── + alive = [p for p in cluster.GetNodePoolNames() if p.startswith(f"{_PREFIX}a")] + logging.info( + "Scenario A: %d live pools found for delete (originally %d)", + len(alive), + n, + ) + delete_results = _RunAsync( + kickoff=cluster.DeleteNodePoolAsync, + wait_fn=cluster.WaitForOperation, + items=alive, + get_name=str, + ) + # attempted_ops=n: success rate reflects original request, not just live. + # EKS rolls back timed-out pools silently — without this shows 100%. + samples += _OpSamples("ScenarioA_Delete", delete_results, attempted_ops=n) + return samples + + +def _RunScenarioAPipelined( + cluster: kubernetes_cluster.KubernetesCluster, + n: int, + initial: str, + target: str, +) -> list[sample.Sample]: + """Per-pool pipeline: create->upgrade->delete back-to-back per thread. + + Minimizes wall time: max_i(create_i + upgrade_i + delete_i) vs + max(creates)+max(upgrades)+max(deletes) in phase-by-phase mode. + Trade-off: ops run under mixed-type concurrent load. + """ + pool_names = [_ScenarioAName(i) for i in range(n)] + creates = _Results() + upgrades = _Results() + deletes = _Results() + + def DoPool(pool_name: str): + """Runs timed create/upgrade/delete for one pool.""" + cfg = _MakeNodePoolConfig(cluster, pool_name) + init, e2e, err = _TimedAsync( + lambda: cluster.CreateNodePoolAsync(cfg, node_version=initial), + cluster.WaitForOperation, + ) + creates.add(pool_name, init, e2e, err) + if err is not None: + return + init, e2e, err = _TimedAsync( + lambda: cluster.UpgradeNodePoolAsync(pool_name, target), + cluster.WaitForOperation, + ) + upgrades.add(pool_name, init, e2e, err) + init, e2e, err = _TimedAsync( + lambda: cluster.DeleteNodePoolAsync(pool_name), + cluster.WaitForOperation, + ) + deletes.add(pool_name, init, e2e, err) + + background_tasks.RunThreaded( + DoPool, + pool_names, + max_concurrent_threads=min(n, _MAX_CONCURRENT.value), + ) + samples: list[sample.Sample] = [] + samples += _OpSamples("ScenarioA_Create", creates.entries, attempted_ops=n) + samples += _OpSamples("ScenarioA_Upgrade", upgrades.entries, attempted_ops=n) + samples += _OpSamples("ScenarioA_Delete", deletes.entries, attempted_ops=n) + return samples + + +# --------------------------------------------------------------------------- +# Scenario B +# --------------------------------------------------------------------------- + + +def _RunScenarioB( + cluster: kubernetes_cluster.KubernetesCluster, + initial: str, +) -> list[sample.Sample]: + """CreateNodePool fired concurrently with a long-running cluster update. + + Both ops kick off async on separate threads; initiation + E2E latency + recorded independently. Overlap window = ClusterUpdate E2E latency. + """ + logging.info("Scenario B: overlapping cluster update + node-pool create") + cfg = _MakeNodePoolConfig(cluster, _SCENARIO_B_NAME) + results = _Results() + + def DoClusterUpdate(): + init, e2e, err = _TimedAsync( + cluster.UpdateClusterAsync, cluster.WaitForOperation + ) + results.add("ScenarioB_ClusterUpdate", init, e2e, err) + logging.info( + "Scenario B ClusterUpdate: init=%.2fs e2e=%.2fs ok=%s", + init, + e2e, + err is None, + ) + + def DoCreate(): + init, e2e, err = _TimedAsync( + lambda: cluster.CreateNodePoolAsync(cfg, node_version=initial), + cluster.WaitForOperation, + ) + results.add("ScenarioB_NodePoolCreate", init, e2e, err) + logging.info( + "Scenario B NodePoolCreate: init=%.2fs e2e=%.2fs ok=%s", + init, + e2e, + err is None, + ) + + background_tasks.RunThreaded(lambda fn: fn(), [DoClusterUpdate, DoCreate]) + + samples: list[sample.Sample] = [] + for entry in results.entries: + samples += _OpSamples(entry.name, [entry], attempted_ops=1) + + # Remove test pool (best-effort). + cluster.DeleteNodePool(_SCENARIO_B_NAME) + return samples + + +# --------------------------------------------------------------------------- +# Scenario C +# --------------------------------------------------------------------------- + + +def _RunScenarioC( + cluster: kubernetes_cluster.KubernetesCluster, + initial: str, + scale: int, +) -> list[sample.Sample]: + """Large-scale node-pool provisioning at a given scale. + + Streams all `scale` creates through a single executor capped at + _MAX_CONCURRENT workers — as each op completes the next starts immediately + (no batch barriers). Delete uses a live-list so EKS-rolled-back pools are + excluded from the denominator correctly. + """ + logging.info( + "Scenario C: scale=%d, max_concurrent=%d, initial_version=%s", + scale, + _MAX_CONCURRENT.value, + initial, + ) + pool_names = [_ScenarioCName(i) for i in range(scale)] + configs_ = [_MakeNodePoolConfig(cluster, name) for name in pool_names] + samples: list[sample.Sample] = [] + + # ── Creates ────────────────────────────────────────────────────────────── + create_results = _RunAsync( + kickoff=lambda cfg: cluster.CreateNodePoolAsync( + cfg, node_version=initial + ), + wait_fn=cluster.WaitForOperation, + items=configs_, + get_name=lambda cfg: cfg.name, + ) + created_ok = sum(1 for r in create_results if r.error is None) + logging.info( + "Scenario C scale=%d: %d/%d creates succeeded", scale, created_ok, scale + ) + samples += _OpSamples("ScenarioC_Create", create_results, attempted_ops=scale) + + # ── Deletes (live-list) ────────────────────────────────────────────────── + alive = [p for p in cluster.GetNodePoolNames() if p.startswith(f"{_PREFIX}c")] + logging.info( + "Scenario C scale=%d: %d live pools for delete (originally %d;" + + " %d rolled back by cloud)", + scale, + len(alive), + scale, + scale - len(alive), + ) + if not alive: + logging.info("Scenario C scale=%d: all creates rolled back.", scale) + samples += _OpSamples("ScenarioC_Delete", [], attempted_ops=scale) + return samples + + delete_results = _RunAsync( + kickoff=cluster.DeleteNodePoolAsync, + wait_fn=cluster.WaitForOperation, + items=alive, + get_name=str, + ) + # attempted_ops=scale: accurate rate against original request count. + samples += _OpSamples("ScenarioC_Delete", delete_results, attempted_ops=scale) + return samples + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +class _Results: + """Thread-safe collector for (name, init_latency, e2e_latency, error).""" + + def __init__(self): + self._lock = threading.Lock() + self.entries: list[_OpResult] = [] + + def add( + self, name: str, init_dur: float, e2e_dur: float, err: Exception | None + ) -> None: + result = _OpResult(name, init_dur, e2e_dur, err) + with self._lock: + self.entries.append(result) + + +def _TimedAsync( + kickoff: Callable[[], str], + wait_fn: Callable[[str], None], +) -> tuple[float, float, Exception | None]: + """Runs kickoff() then wait_fn(handle); returns (init_lat, e2e_lat, err). + + init_lat = time for kickoff() to return (API accepted). + e2e_lat = total wall time including wait. On kickoff failure both are set + to elapsed time at failure point. + """ + init_start = time.monotonic() + try: + handle = kickoff() + except Exception as exc: # pylint: disable=broad-except + elapsed = time.monotonic() - init_start + return elapsed, elapsed, exc + init_dur = time.monotonic() - init_start + try: + wait_fn(handle) + return init_dur, time.monotonic() - init_start, None + except Exception as exc: # pylint: disable=broad-except + return init_dur, time.monotonic() - init_start, exc + + +def _RunAsync( + kickoff: Callable, + wait_fn: Callable[[str], None], + items: list, + get_name: Callable[[object], str], +) -> list[tuple[str, float, float, Exception | None]]: + """Fires kickoff(item) concurrently for all items; returns timed results. + + Uses background_tasks.RunThreaded with a concurrency cap for streaming + execution — completed ops free their slot immediately for the next one. + """ + if not items: + return [] + results = _Results() + cap = min(len(items), _MAX_CONCURRENT.value) + + def DoWrap(item): + init_dur, e2e_dur, err = _TimedAsync(lambda: kickoff(item), wait_fn) + name = get_name(item) + results.add(name, init_dur, e2e_dur, err) + logging.info( + "%s ok=%s initiation=%.2fs end_to_end=%.2fs", + name, + err is None, + init_dur, + e2e_dur, + ) + + background_tasks.RunThreaded(DoWrap, items, max_concurrent_threads=cap) + return results.entries + + +def _MakeNodePoolConfig( + cluster: kubernetes_cluster.KubernetesCluster, + name: str, +) -> container_lib.BaseNodePoolConfig: + """Builds a node-pool config from the cluster's default pool.""" + cfg = copy.copy(cluster.default_nodepool) + cfg.name = name + cfg.num_nodes = _NODES_PER_NODEPOOL.value + cfg.min_nodes = _NODES_PER_NODEPOOL.value + cfg.max_nodes = _NODES_PER_NODEPOOL.value + return cfg + + +def _OpSamples( + metric_prefix: str, + results: list[_OpResult], + attempted_ops: int | None = None, +) -> list[sample.Sample]: + """Per-op + aggregate samples for initiation and end-to-end latency. + + Args: + metric_prefix: prefix for all metric names. + results: list of (operation_name, init_lat, e2e_lat, err). + attempted_ops: total ops originally requested. Used as the denominator + for SuccessRate so EKS-rolled-back pools (which never + appear in results) are counted as failures, not ignored. + If None, len(results) is used (original behavior). + """ + samples: list[sample.Sample] = [] + init_latencies: list[float] = [] + e2e_latencies: list[float] = [] + success = 0 + + for r in results: + if isinstance(r, tuple): + r = _OpResult(*r) + meta = {"operation_name": r.name, "success": str(r.error is None)} + if r.error is not None: + meta["error"] = str(r.error)[:200] + else: + success += 1 + init_latencies.append(r.init_dur) + e2e_latencies.append(r.e2e_dur) + samples.append( + sample.Sample( + f"{metric_prefix}_InitiationLatency", + r.init_dur, + "seconds", + dict(meta), + ) + ) + samples.append( + sample.Sample( + f"{metric_prefix}_EndToEndLatency", r.e2e_dur, "seconds", dict(meta) + ) + ) + + # ── Success rate ───────────────────────────────────────────────────────── + total = attempted_ops if attempted_ops is not None else len(results) + executed = len(results) + if total > 0: + samples.append( + sample.Sample( + f"{metric_prefix}_SuccessRate", + 100.0 * success / total, + "percent", + { + "total_ops": str(total), + "executed_ops": str(executed), + "successful_ops": str(success), + "skipped_ops": str(total - executed), + }, + ) + ) + + # ── Aggregate stats (successful ops only) ──────────────────────────────── + for phase_label, latencies in ( + ("InitiationLatency", init_latencies), + ("EndToEndLatency", e2e_latencies), + ): + if len(latencies) >= 2: + samples += _AggregateSamples(metric_prefix, phase_label, latencies) + if len(latencies) >= 4: + samples += _OutlierSamples(metric_prefix, phase_label, latencies) + + return samples + + +def _AggregateSamples( + metric_prefix: str, phase_label: str, latencies: list[float] +) -> list[sample.Sample]: + """Emits Mean/StdDev/Min/Median/P90/P99/Max samples for a latency series.""" + n = len(latencies) + meta = {"sample_count": str(n)} + + # statistics.quantiles with method='inclusive' matches linear interpolation + # and returns n-1 cut points; index 89→P90, 98→P99. + quantiles = statistics.quantiles(latencies, n=100, method="inclusive") + + stats = [ + ("Mean", statistics.mean(latencies)), + ("StdDev", statistics.pstdev(latencies)), + ("Min", min(latencies)), + ("Median", statistics.median(latencies)), + ("P90", quantiles[89]), + ("P99", quantiles[98]), + ("Max", max(latencies)), + ] + result = [] + for label, value in stats: + result.append( + sample.Sample( + f"{metric_prefix}_{phase_label}_{label}", + value, + "seconds", + dict(meta), + ) + ) + return result + + +def _OutlierSamples( + metric_prefix: str, phase_label: str, latencies: list[float] +) -> list[sample.Sample]: + """Emits a single OutlierCount sample using IQR-fence outlier detection.""" + # statistics.quantiles(n=4) returns [Q1, Q2, Q3]; indices 0 and 2. + quartiles = statistics.quantiles(latencies, n=4, method="inclusive") + q1, q3 = quartiles[0], quartiles[2] + iqr = q3 - q1 + lower_fence = q1 - 1.5 * iqr + upper_fence = q3 + 1.5 * iqr + outlier_count = sum( + 1 for v in latencies if v < lower_fence or v > upper_fence + ) + meta = { + "q1": str(q1), + "q3": str(q3), + "iqr": str(iqr), + "upper_fence": str(upper_fence), + "lower_fence": str(lower_fence), + "sample_count": str(len(latencies)), + } + return [ + sample.Sample( + f"{metric_prefix}_{phase_label}_OutlierCount", + outlier_count, + "count", + meta, + ) + ] diff --git a/perfkitbenchmarker/resources/container_service/kubernetes_cluster.py b/perfkitbenchmarker/resources/container_service/kubernetes_cluster.py index 9e20f57583..900f710d1b 100644 --- a/perfkitbenchmarker/resources/container_service/kubernetes_cluster.py +++ b/perfkitbenchmarker/resources/container_service/kubernetes_cluster.py @@ -1,5 +1,6 @@ """Classes related to KubernetesCluster.""" +import abc import functools import json import logging @@ -10,7 +11,9 @@ from perfkitbenchmarker import vm_util from perfkitbenchmarker.configs import container_spec as container_spec_lib from perfkitbenchmarker.resources import kubernetes_inference_server -from perfkitbenchmarker.resources.container_service import container as container_lib +from perfkitbenchmarker.resources.container_service import ( + container as container_lib, +) from perfkitbenchmarker.resources.container_service import container_cluster from perfkitbenchmarker.resources.container_service import kubectl from perfkitbenchmarker.resources.container_service import kubernetes @@ -49,9 +52,25 @@ def Create(self, restore: bool = False) -> None: self.inference_server.Create() def _PostCreate(self): + """Starts the event poller after the cluster has been created.""" super()._PostCreate() - if self.cluster_spec.poll_for_events: - self.event_poller.StartPolling() + if self.event_poller: + try: + self.event_poller.StartPolling() + except Exception as exc: # pylint: disable=broad-except + # Python 3.14 tightened pickling rules for multiprocessing — local + # functions passed to Process cannot be pickled. Rather than crashing + # PKB entirely (which prevents cleanup and orphans cloud resources), + # log a warning and continue without the event poller. + # Impact: no Kubernetes event streaming during the run — benchmark + # metrics are unaffected. + logging.warning( + 'Event poller failed to start (non-fatal, continuing without ' + + 'event polling): %s. This is a known Python 3.14 pickling ' + + 'issue — switch to Python 3.13 to enable event polling.', + exc, + ) + self.event_poller = None def Delete(self, freeze: bool = False) -> None: if self.inference_server: @@ -297,9 +316,131 @@ def _GetAddressFromIngress(self, ingress_out: str): ) return 'http://' + ip.strip() - def AddNodepool(self, batch_name: str, pool_id: str): - """Adds an additional nodepool with the given name to the cluster.""" - pass + def AddNodepool(self, batch_name: str, pool_id: str) -> None: + """Adds a node pool; delegates to CreateNodePool for standard clusters. + + Karpenter-based subclasses override this to apply a manifest instead. + """ + nodepool_config = container_lib.BaseNodePoolConfig( + self.nodepools[container_cluster.DEFAULT_NODEPOOL].vm_spec, + name=f'{batch_name}-{pool_id}', + ) + self.CreateNodePool(nodepool_config) + + def CreateNodePool( + self, + nodepool_config: container_lib.BaseNodePoolConfig, + node_version: str | None = None, + ) -> None: + """Creates a node pool and blocks until ready (sync wrapper). + + Args: + nodepool_config: Node pool definition (name, machine type, node count). + node_version: Optional Kubernetes version to pin the node pool to. None + means use the cluster default. + """ + op = self.CreateNodePoolAsync(nodepool_config, node_version) + self.WaitForOperation(op) + + def DeleteNodePool(self, name: str) -> None: + """Deletes the named node pool and blocks until removed (sync wrapper).""" + op = self.DeleteNodePoolAsync(name) + self.WaitForOperation(op) + + def UpgradeNodePool(self, name: str, target_version: str) -> None: + """Upgrades the named node pool and blocks until done (sync wrapper).""" + op = self.UpgradeNodePoolAsync(name, target_version) + self.WaitForOperation(op) + + def UpdateCluster(self) -> None: + """Performs a cluster-level update and blocks until done (sync wrapper). + + Intended for management-plane benchmarks that need to overlap a real + cluster-level operation with a node-pool operation. The implementation + should issue a control-plane mutation (so an actual operation runs) that + is non-destructive and idempotent across repeated invocations. + """ + op = self.UpdateClusterAsync() + self.WaitForOperation(op) + + def CreateNodePoolAsync( + self, + nodepool_config: container_lib.BaseNodePoolConfig, + node_version: str | None = None, + ) -> str: + """Initiates node-pool create; returns opaque op handle. Does NOT wait.""" + raise NotImplementedError + + def UpgradeNodePoolAsync(self, name: str, target_version: str) -> str: + """Initiates node-pool upgrade; returns opaque op handle. Does NOT wait.""" + raise NotImplementedError + + def DeleteNodePoolAsync(self, name: str) -> str: + """Initiates node-pool delete; returns opaque op handle. Does NOT wait.""" + raise NotImplementedError + + def UpdateClusterAsync(self) -> str: + """Initiates cluster-level update. Returns op handle; does NOT wait.""" + raise NotImplementedError + + @abc.abstractmethod + def GetNodePoolNames(self) -> list[str]: + """Returns the names of all node pools currently in the cluster. + + Used by the kubernetes_management benchmark to: + - Sweep stale pkbm* pools before each run (clean-start spec requirement) + - Re-list live pools after creates before deleting (avoids stale names) + """ + + def WaitForOperation(self, op_handle: str) -> None: + """Blocks until the operation identified by op_handle completes. + + Args: + op_handle: provider-specific opaque string from one of the *Async + methods above. + + Raises: + errors.Resource.RetryableCreationError or similar on timeout/failure. + """ + raise NotImplementedError + + def ResolveNodePoolVersions(self) -> tuple[str, str]: + """Returns (initial, target) K8s versions per benchmark spec. + + Spec contract: + target = cluster's current K8s version (the latest available) + initial = the adjacent minor below target (e.g., target=1.35 -> 1.34) + Default implementation returns bare-minor strings ("1.34", "1.35") which + EKS and AKS accept directly. Providers requiring fully-qualified versions + (notably GKE) must override. + """ + target = BareMinor(self.k8s_version) + initial = AdjacentMinorBelow(self.k8s_version) + return initial, target + + +def BareMinor(version: str) -> str: + """Returns the 'major.minor' part of a K8s version string. + + Accepts and normalizes formats like 'v1.35.4', '1.35.4-gke.1234', '1.35'. + """ + if version.startswith('v'): + version = version[1:] + bare = version.split('-', 1)[0] + parts = bare.split('.') + if len(parts) < 2 or not parts[0].isdigit() or not parts[1].isdigit(): + raise ValueError(f'Cannot parse K8s version: {version!r}') + return f'{parts[0]}.{parts[1]}' + + +def AdjacentMinorBelow(version: str) -> str: + """Returns the bare minor one below the given version: '1.35.4' -> '1.34'.""" + bare = BareMinor(version) + major_s, minor_s = bare.split('.') + minor = int(minor_s) + if minor <= 0: + raise ValueError(f'No adjacent minor below {version!r}') + return f'{major_s}.{minor - 1}' def _DeleteAllFromDefaultNamespace(): diff --git a/tests/linux_benchmarks/kubernetes_management_benchmark_test.py b/tests/linux_benchmarks/kubernetes_management_benchmark_test.py index a58ad03497..6bc04ae4e7 100644 --- a/tests/linux_benchmarks/kubernetes_management_benchmark_test.py +++ b/tests/linux_benchmarks/kubernetes_management_benchmark_test.py @@ -11,28 +11,1110 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Tests for linux_benchmarks.kubernetes_management_benchmark.""" +# pylint: disable=invalid-name,protected-access + +import threading +import time import unittest from unittest import mock -from perfkitbenchmarker import benchmark_spec as bm_spec_lib +from absl import flags +from absl.testing import flagsaver +from perfkitbenchmarker import errors +from perfkitbenchmarker import sample from perfkitbenchmarker.linux_benchmarks import kubernetes_management_benchmark +from perfkitbenchmarker.resources.container_service import kubernetes_cluster from tests import pkb_common_test_case +FLAGS = flags.FLAGS + +_CLUSTER_NAME = 'test-cluster' + -class KubernetesManagementBenchmarkTestCase( - pkb_common_test_case.PkbCommonTestCase +def _make_sample(metric, value, unit='seconds', metadata=None): + return sample.Sample(metric, value, unit, metadata or {}) + + +def _make_mock_cluster( + name=_CLUSTER_NAME, + k8s_version='1.34', + pool_names=None, ): + """Creates a fully-stubbed KubernetesCluster mock for use in tests.""" + cluster = mock.create_autospec( + kubernetes_cluster.KubernetesCluster, instance=True + ) + cluster.name = name + cluster.k8s_version = k8s_version + cluster.cluster_version = k8s_version + cluster.GetNodePoolNames.return_value = pool_names or [] + cluster.ResolveNodePoolVersions.return_value = ('1.33', '1.34') + cluster.CreateNodePoolAsync.return_value = 'op-create-1' + cluster.UpgradeNodePoolAsync.return_value = 'op-upgrade-1' + cluster.DeleteNodePoolAsync.return_value = 'op-delete-1' + cluster.UpdateClusterAsync.return_value = 'op-update-1' + cluster.WaitForOperation.return_value = None + default_np = mock.MagicMock() + default_np.machine_type = 'e2-standard-2' + default_np.num_nodes = 1 + default_np.min_nodes = 1 + default_np.max_nodes = 1 + default_np.zone = 'us-central1-a' + default_np.disk_size = 100 + default_np.name = 'default-pool' + cluster.default_nodepool = default_np + return cluster + + +def _make_mock_benchmark_spec(cluster=None): + spec = mock.MagicMock() + spec.container_cluster = cluster or _make_mock_cluster() + return spec + + +def _make_mock_config(cluster_type='Kubernetes'): + cfg = mock.MagicMock() + cfg.container_cluster.type = cluster_type + return cfg + + +class ScenarioNameTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for _SCENARIO_A_NAME, _SCENARIO_B_NAME, _SCENARIO_C_NAME.""" + + def testScenarioANameZeroPadsToThreeDigits(self): + self.assertEqual( + 'pkbma000', + kubernetes_management_benchmark._ScenarioAName(0), + ) + + def testScenarioANameTwoDigitIndex(self): + self.assertEqual( + 'pkbma042', + kubernetes_management_benchmark._ScenarioAName(42), + ) + + def testScenarioANameMaxThreeDigits(self): + self.assertEqual( + 'pkbma999', + kubernetes_management_benchmark._ScenarioAName(999), + ) + + def testScenarioBNameIsConstant(self): + self.assertEqual( + 'pkbmb', + kubernetes_management_benchmark._SCENARIO_B_NAME, + ) + + def testScenarioCNameZeroPadsToFourDigits(self): + self.assertEqual( + 'pkbmc0000', + kubernetes_management_benchmark._ScenarioCName(0), + ) + + def testScenarioCNameSingleDigitIndex(self): + self.assertEqual( + 'pkbmc0007', + kubernetes_management_benchmark._ScenarioCName(7), + ) + + def testScenarioCNameFourDigitIndex(self): + self.assertEqual( + 'pkbmc1000', + kubernetes_management_benchmark._ScenarioCName(1000), + ) + + def testAllNamesWithinAksLimit(self): + for i in range(1000): + self.assertLessEqual( + len(kubernetes_management_benchmark._ScenarioAName(i)), 12 + ) + for i in range(10000): + self.assertLessEqual( + len(kubernetes_management_benchmark._ScenarioCName(i)), 12 + ) + self.assertLessEqual( + len(kubernetes_management_benchmark._SCENARIO_B_NAME), 12 + ) + + +class CheckPrerequisitesTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the CheckPrerequisites validation function.""" + + def testValidScenariosPass(self): + with flagsaver.flagsaver(k8s_mgmt_scenarios=['A', 'B', 'C']): + kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) + + def testInvalidScenarioRaises(self): + with flagsaver.flagsaver(k8s_mgmt_scenarios=['X']): + with self.assertRaises(errors.Config.InvalidValue): + kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) + + def testMixedValidInvalidRaises(self): + with flagsaver.flagsaver(k8s_mgmt_scenarios=['A', 'Z']): + with self.assertRaises(errors.Config.InvalidValue): + kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) + + def testNonKubernetesClusterTypeRaises(self): + with flagsaver.flagsaver(k8s_mgmt_scenarios=['A']): + with self.assertRaises(errors.Config.InvalidValue): + kubernetes_management_benchmark.CheckPrerequisites( + _make_mock_config(cluster_type='Mesos') + ) + + def testInvalidScaleSweepRaises(self): + with flagsaver.flagsaver( + k8s_mgmt_scenarios=['C'], k8s_mgmt_scale_sweep=['10', 'abc'] + ): + with self.assertRaises(errors.Config.InvalidValue): + kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) + + def testValidScaleSweepPasses(self): + with flagsaver.flagsaver( + k8s_mgmt_scenarios=['C'], k8s_mgmt_scale_sweep=['10', '50', '100'] + ): + kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) + + def testLowercaseScenarioRaises(self): + with flagsaver.flagsaver(k8s_mgmt_scenarios=['a']): + with self.assertRaises(errors.Config.InvalidValue): + kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) + + +class PrepareTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the Prepare benchmark lifecycle function.""" + + def _patch_kubectl(self, rc=0): + return mock.patch( + 'perfkitbenchmarker.resources.container_service.kubectl' + + '.RunKubectlCommand', + return_value=('', '', rc), + ) + + def testPrepareRunsKubectlSleepPod(self): + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with self._patch_kubectl() as mock_kubectl: + kubernetes_management_benchmark.Prepare(bm_spec) + mock_kubectl.assert_called_once() + args = mock_kubectl.call_args[0][0] + self.assertIn('run', args) + self.assertIn('pkb-mgmt-sleep', args) + self.assertIn('sleep', args) + + def testPrepareSetsAlwaysCallCleanup(self): + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with self._patch_kubectl(): + kubernetes_management_benchmark.Prepare(bm_spec) + self.assertTrue(bm_spec.always_call_cleanup) + + def testPrepareToleratesKubectlNonZeroReturn(self): + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with self._patch_kubectl(rc=1): + kubernetes_management_benchmark.Prepare(bm_spec) + + +class CleanupTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the Cleanup benchmark lifecycle function.""" + + def _patch_kubectl(self): + return mock.patch( + 'perfkitbenchmarker.resources.container_service.kubectl' + + '.RunKubectlCommand', + return_value=('', '', 0), + ) + + def testCleanupDeletesSleepPod(self): + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with self._patch_kubectl() as mock_kubectl: + kubernetes_management_benchmark.Cleanup(bm_spec) + delete_calls = [ + str(c) + for c in mock_kubectl.call_args_list + if 'pkb-mgmt-sleep' in str(c) + ] + self.assertNotEmpty(delete_calls) + + def testCleanupDeletesAllPkbmPrefixedPools(self): + cluster = _make_mock_cluster( + pool_names=['pkbma000', 'default-pool', 'pkbmc0001'] + ) + bm_spec = _make_mock_benchmark_spec(cluster) + with self._patch_kubectl(): + kubernetes_management_benchmark.Cleanup(bm_spec) + deleted = {c.args[0] for c in cluster.DeleteNodePool.call_args_list} + self.assertIn('pkbma000', deleted) + self.assertIn('pkbmc0001', deleted) + self.assertNotIn('default-pool', deleted) + + def testCleanupSkipsDeleteWhenNoLeftoverPools(self): + cluster = _make_mock_cluster(pool_names=['default-pool']) + bm_spec = _make_mock_benchmark_spec(cluster) + with self._patch_kubectl(): + kubernetes_management_benchmark.Cleanup(bm_spec) + cluster.DeleteNodePool.assert_not_called() + + def testCleanupHandlesNoneCluster(self): + bm_spec = _make_mock_benchmark_spec() + bm_spec.container_cluster = None + kubernetes_management_benchmark.Cleanup(bm_spec) + + +class CleanStartSweepTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _CleanStartSweep helper function.""" + + def testDeletesStalePkbmPools(self): + cluster = _make_mock_cluster( + pool_names=['pkbma000', 'pkbmc0001', 'user-pool'] + ) + kubernetes_management_benchmark._CleanStartSweep(cluster) + deleted = {c.args[0] for c in cluster.DeleteNodePool.call_args_list} + self.assertIn('pkbma000', deleted) + self.assertIn('pkbmc0001', deleted) + self.assertNotIn('user-pool', deleted) + + def testDoesNothingWhenNoPkbmPools(self): + cluster = _make_mock_cluster(pool_names=['user-pool', 'default-pool']) + kubernetes_management_benchmark._CleanStartSweep(cluster) + cluster.DeleteNodePool.assert_not_called() + + def testCleanStartSweepRaisesOnGetNodePoolNamesException(self): + cluster = _make_mock_cluster() + cluster.GetNodePoolNames.side_effect = RuntimeError('API error') + with self.assertRaises(RuntimeError): + kubernetes_management_benchmark._CleanStartSweep(cluster) + + +class ResultsTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _Results result-accumulator helper.""" + + def testAddSingleEntry(self): + r = kubernetes_management_benchmark._Results() + r.add('op1', 0.1, 1.0, None) + self.assertLen(r.entries, 1) + name, init, e2e, err = r.entries[0] + self.assertEqual('op1', name) + self.assertAlmostEqual(0.1, init, places=5) + self.assertAlmostEqual(1.0, e2e, places=5) + self.assertIsNone(err) + + def testAddMultipleEntries(self): + r = kubernetes_management_benchmark._Results() + r.add('op1', 0.1, 1.0, None) + r.add('op2', 0.2, 2.0, ValueError('fail')) + self.assertLen(r.entries, 2) + + def testAddIsThreadSafe(self): + """Tests that concurrent add() calls from multiple threads are safe.""" + r = kubernetes_management_benchmark._Results() + n = 100 + + def _add(i): + r.add(f'op{i}', float(i), float(i) * 2, None) + + threads = [threading.Thread(target=_add, args=(i,)) for i in range(n)] + for t in threads: + t.start() + for t in threads: + t.join() + self.assertLen(r.entries, n) + + def testAddPreservesError(self): + r = kubernetes_management_benchmark._Results() + exc = RuntimeError('test error') + r.add('failing-op', 0.5, 0.5, exc) + _, _, _, err = r.entries[0] + self.assertIs(exc, err) + + +class TimedAsyncTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _TimedAsync timing helper.""" + + def testSuccessfulKickoffAndWait(self): + kickoff = mock.Mock(return_value='op-handle') + wait_fn = mock.Mock(return_value=None) + init_lat, e2e_lat, err = kubernetes_management_benchmark._TimedAsync( + kickoff, wait_fn + ) + kickoff.assert_called_once() + wait_fn.assert_called_once_with('op-handle') + self.assertIsNone(err) + self.assertGreaterEqual(init_lat, 0.0) + self.assertGreaterEqual(e2e_lat, init_lat) + + def testKickoffFailureReturnsError(self): + exc = RuntimeError('kickoff failed') + kickoff = mock.Mock(side_effect=exc) + wait_fn = mock.Mock() + init_lat, e2e_lat, err = kubernetes_management_benchmark._TimedAsync( + kickoff, wait_fn + ) + self.assertIs(exc, err) + wait_fn.assert_not_called() + self.assertAlmostEqual(init_lat, e2e_lat, places=2) + + def testWaitFailureReturnsError(self): + exc = RuntimeError('wait failed') + kickoff = mock.Mock(return_value='op-handle') + wait_fn = mock.Mock(side_effect=exc) + _, e2e_lat, err = kubernetes_management_benchmark._TimedAsync( + kickoff, wait_fn + ) + self.assertIs(exc, err) + self.assertGreater(e2e_lat, 0.0) + + def testInitLatencyNotGreaterThanE2eLatency(self): + kickoff = mock.Mock(return_value='handle') + wait_fn = mock.Mock(side_effect=lambda _: time.sleep(0.01)) + init_lat, e2e_lat, err = kubernetes_management_benchmark._TimedAsync( + kickoff, wait_fn + ) + self.assertIsNone(err) + self.assertLessEqual(init_lat, e2e_lat) + + def testHandlePassedToWaitFn(self): + kickoff = mock.Mock(return_value='my-op-handle') + wait_fn = mock.Mock() + kubernetes_management_benchmark._TimedAsync(kickoff, wait_fn) + wait_fn.assert_called_once_with('my-op-handle') + + +class RunAsyncTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _RunAsync concurrent execution helper.""" + + def testEmptyItemsReturnsEmptyList(self): + results = kubernetes_management_benchmark._RunAsync( + kickoff=mock.Mock(), + wait_fn=mock.Mock(), + items=[], + get_name=str, + ) + self.assertEmpty(results) + + @flagsaver.flagsaver(k8s_mgmt_max_concurrent=50) + def testReturnsOneResultPerItem(self): + kickoff = mock.Mock(return_value='op-handle') + wait_fn = mock.Mock(return_value=None) + results = kubernetes_management_benchmark._RunAsync( + kickoff=kickoff, wait_fn=wait_fn, items=['a', 'b', 'c'], get_name=str + ) + self.assertLen(results, 3) + self.assertEqual({'a', 'b', 'c'}, {name for name, _, _, _ in results}) + + @flagsaver.flagsaver(k8s_mgmt_max_concurrent=50) + def testKickoffErrorCapturedInResults(self): + kickoff = mock.Mock(side_effect=RuntimeError('kaboom')) + results = kubernetes_management_benchmark._RunAsync( + kickoff=kickoff, wait_fn=mock.Mock(), items=['x'], get_name=str + ) + self.assertLen(results, 1) + _, _, _, err = results[0] + self.assertIsNotNone(err) + + @flagsaver.flagsaver(k8s_mgmt_max_concurrent=2) + def testConcurrencyCapDoesNotDropItems(self): + results = kubernetes_management_benchmark._RunAsync( + kickoff=mock.Mock(return_value='op'), + wait_fn=mock.Mock(return_value=None), + items=list(range(5)), + get_name=str, + ) + self.assertLen(results, 5) + + @flagsaver.flagsaver(k8s_mgmt_max_concurrent=50) + def testGetNameCallableApplied(self): + cfg = mock.MagicMock() + cfg.name = 'poolname' + results = kubernetes_management_benchmark._RunAsync( + kickoff=mock.Mock(return_value='h'), + wait_fn=mock.Mock(), + items=[cfg], + get_name=lambda c: c.name, + ) + name, _, _, _ = results[0] + self.assertEqual('poolname', name) + + +class MakeNodePoolConfigTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _MakeNodePoolConfig factory.""" + + @flagsaver.flagsaver(k8s_mgmt_nodes_per_nodepool=3) + def testNameIsSet(self): + cluster = _make_mock_cluster() + cfg = kubernetes_management_benchmark._MakeNodePoolConfig(cluster, 'mypool') + self.assertEqual('mypool', cfg.name) + + @flagsaver.flagsaver(k8s_mgmt_nodes_per_nodepool=3) + def testNumNodesComesFromFlag(self): + cluster = _make_mock_cluster() + cfg = kubernetes_management_benchmark._MakeNodePoolConfig(cluster, 'p') + self.assertEqual(3, cfg.num_nodes) + self.assertEqual(3, cfg.min_nodes) + self.assertEqual(3, cfg.max_nodes) + + @flagsaver.flagsaver(k8s_mgmt_nodes_per_nodepool=1) + def testDoesNotMutateDefaultNodepool(self): + cluster = _make_mock_cluster() + original_name = cluster.default_nodepool.name + kubernetes_management_benchmark._MakeNodePoolConfig(cluster, 'newname') + self.assertEqual(original_name, cluster.default_nodepool.name) + - def setUp(self): - super().setUp() - self.bm_spec = mock.create_autospec( - bm_spec_lib.BenchmarkSpec, instance=True +class OpSamplesTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _OpSamples sample-generation helper.""" + + def testEmptyResultsYieldsSuccessRateOfZero(self): + samples = kubernetes_management_benchmark._OpSamples( + 'PrefixOp', [], attempted_ops=5 + ) + rate = next(s for s in samples if s.metric == 'PrefixOp_SuccessRate') + self.assertEqual(0.0, rate.value) + + def testPerOpInitiationAndE2eSamplesGenerated(self): + results = [('op1', 0.1, 1.0, None), ('op2', 0.2, 2.0, None)] + samples = kubernetes_management_benchmark._OpSamples( + 'MyOp', results, attempted_ops=2 + ) + metrics = [s.metric for s in samples] + self.assertIn('MyOp_InitiationLatency', metrics) + self.assertIn('MyOp_EndToEndLatency', metrics) + + def testSuccessRateHundredPercentWhenAllSucceed(self): + results = [('op1', 1.0, 2.0, None), ('op2', 0.5, 1.5, None)] + samples = kubernetes_management_benchmark._OpSamples( + 'Op', results, attempted_ops=2 + ) + rate = next(s for s in samples if s.metric == 'Op_SuccessRate') + self.assertAlmostEqual(100.0, rate.value) + + def testSuccessRateFiftyPercentWhenHalfFail(self): + results = [ + ('op1', 1.0, 2.0, None), + ('op2', 0.5, 0.5, RuntimeError('fail')), + ] + samples = kubernetes_management_benchmark._OpSamples( + 'Op', results, attempted_ops=2 + ) + rate = next(s for s in samples if s.metric == 'Op_SuccessRate') + self.assertAlmostEqual(50.0, rate.value) + + def testAttemptedOpsExceedingExecutedOpsLowersRate(self): + results = [('op1', 1.0, 2.0, None)] + samples = kubernetes_management_benchmark._OpSamples( + 'Op', results, attempted_ops=3 + ) + rate = next(s for s in samples if s.metric == 'Op_SuccessRate') + self.assertAlmostEqual(100.0 / 3, rate.value, places=3) + + def testSuccessRateMetadataFields(self): + results = [('op1', 1.0, 2.0, None), ('op2', 0.5, 0.5, Exception('err'))] + samples = kubernetes_management_benchmark._OpSamples( + 'Op', results, attempted_ops=3 + ) + rate = next(s for s in samples if s.metric == 'Op_SuccessRate') + self.assertEqual('3', rate.metadata['total_ops']) + self.assertEqual('2', rate.metadata['executed_ops']) + self.assertEqual('1', rate.metadata['successful_ops']) + self.assertEqual('1', rate.metadata['skipped_ops']) + + def testFailedOpIncludesErrorMessage(self): + results = [('fail-op', 0.5, 0.5, RuntimeError('oops'))] + samples = kubernetes_management_benchmark._OpSamples( + 'Op', results, attempted_ops=1 + ) + init_s = next(s for s in samples if s.metric == 'Op_InitiationLatency') + self.assertIn('error', init_s.metadata) + self.assertIn('oops', init_s.metadata['error']) + + def testAggregatesGeneratedForTwoOrMoreSuccesses(self): + results = [(f'op{i}', float(i), float(i) * 2, None) for i in range(1, 4)] + samples = kubernetes_management_benchmark._OpSamples( + 'Op', results, attempted_ops=3 + ) + metrics = [s.metric for s in samples] + self.assertIn('Op_InitiationLatency_Mean', metrics) + self.assertIn('Op_EndToEndLatency_Mean', metrics) + + def testAggregatesNotGeneratedForSingleSuccess(self): + results = [('op1', 1.0, 2.0, None)] + samples = kubernetes_management_benchmark._OpSamples( + 'Op', results, attempted_ops=1 + ) + self.assertNotIn('Op_InitiationLatency_Mean', [s.metric for s in samples]) + + def testOutliersGeneratedForFourOrMoreSuccesses(self): + results = [(f'op{i}', float(i), float(i) * 2, None) for i in range(1, 6)] + samples = kubernetes_management_benchmark._OpSamples( + 'Op', results, attempted_ops=5 + ) + metrics = [s.metric for s in samples] + self.assertIn('Op_InitiationLatency_OutlierCount', metrics) + self.assertIn('Op_EndToEndLatency_OutlierCount', metrics) + + def testOutliersNotGeneratedForThreeOrFewerSuccesses(self): + results = [(f'op{i}', float(i), float(i) * 2, None) for i in range(1, 4)] + samples = kubernetes_management_benchmark._OpSamples( + 'Op', results, attempted_ops=3 + ) + self.assertNotIn( + 'Op_InitiationLatency_OutlierCount', [s.metric for s in samples] + ) + + +class AggregateSamplesTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _AggregateSamples statistics helper.""" + + def testProducesAllExpectedStatMetrics(self): + samples = kubernetes_management_benchmark._AggregateSamples( + 'Pfx', 'InitiationLatency', [1.0, 2.0, 3.0, 4.0, 5.0] + ) + metrics = {s.metric for s in samples} + for label in ('Mean', 'StdDev', 'Min', 'Median', 'P90', 'P99', 'Max'): + self.assertIn(f'Pfx_InitiationLatency_{label}', metrics) + + def testMeanValueCorrect(self): + samples = kubernetes_management_benchmark._AggregateSamples( + 'Op', 'E2E', [1.0, 2.0, 3.0, 4.0, 5.0] + ) + mean_s = next(s for s in samples if 'Mean' in s.metric) + self.assertAlmostEqual(3.0, mean_s.value, places=3) + + def testMinValueCorrect(self): + samples = kubernetes_management_benchmark._AggregateSamples( + 'Op', 'E2E', [10.0, 20.0, 30.0] + ) + min_s = next(s for s in samples if 'Min' in s.metric) + self.assertAlmostEqual(10.0, min_s.value, places=3) + + def testMaxValueCorrect(self): + samples = kubernetes_management_benchmark._AggregateSamples( + 'Op', 'E2E', [10.0, 20.0, 30.0] + ) + max_s = next(s for s in samples if 'Max' in s.metric) + self.assertAlmostEqual(30.0, max_s.value, places=3) + + def testSampleCountInMetadata(self): + samples = kubernetes_management_benchmark._AggregateSamples( + 'Op', 'E2E', [1.0, 2.0, 3.0] + ) + for s in samples: + self.assertEqual('3', s.metadata.get('sample_count')) + + def testUnitsAreSeconds(self): + samples = kubernetes_management_benchmark._AggregateSamples( + 'Op', 'E2E', [1.0, 2.0] + ) + for s in samples: + self.assertEqual('seconds', s.unit) + + +class OutlierSamplesTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _OutlierSamples IQR-based outlier detection helper.""" + + def testNoOutliersYieldsZeroCount(self): + samples = kubernetes_management_benchmark._OutlierSamples( + 'Op', 'E2E', [1.0, 1.1, 1.2, 1.3, 1.4, 1.5] + ) + self.assertLen(samples, 1) + self.assertEqual(0, samples[0].value) + + def testClearOutlierDetected(self): + samples = kubernetes_management_benchmark._OutlierSamples( + 'Op', 'E2E', [1.0, 1.0, 1.0, 1.0, 100.0] + ) + self.assertEqual(1, samples[0].value) + + def testMetricNameFormatted(self): + samples = kubernetes_management_benchmark._OutlierSamples( + 'MyPrefix', 'InitiationLatency', [1.0, 2.0, 3.0, 4.0] + ) + self.assertEqual( + 'MyPrefix_InitiationLatency_OutlierCount', samples[0].metric + ) + + def testMetadataContainsFenceFields(self): + """Tests that outlier samples contain fence metadata fields.""" + meta = kubernetes_management_benchmark._OutlierSamples( + 'Op', 'E2E', [1.0, 2.0, 3.0, 4.0, 5.0] + )[0].metadata + for field in ( + 'q1', + 'q3', + 'iqr', + 'upper_fence', + 'lower_fence', + 'sample_count', + ): + self.assertIn(field, meta) + + def testSampleCountInMetadata(self): + samples = kubernetes_management_benchmark._OutlierSamples( + 'Op', 'E2E', [1.0, 2.0, 3.0, 4.0, 5.0] ) + self.assertEqual('5', samples[0].metadata['sample_count']) - def testRun(self): - samples = kubernetes_management_benchmark.Run(self.bm_spec) - self.assertEqual(samples, []) + def testUnitIsCount(self): + samples = kubernetes_management_benchmark._OutlierSamples( + 'Op', 'E2E', [1.0, 2.0, 3.0, 4.0] + ) + self.assertEqual('count', samples[0].unit) + + def testReturnsSingleSample(self): + samples = kubernetes_management_benchmark._OutlierSamples( + 'Op', 'E2E', list(range(1, 11)) + ) + self.assertLen(samples, 1) + + +class RunTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the Run benchmark entry-point function.""" + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['A', 'B', 'C'], + k8s_mgmt_scale_sweep=[], + k8s_mgmt_large_scale_nodepools=10, + ) + def testRunCallsCleanStartSweep(self): + """Tests that Run invokes _CleanStartSweep before executing scenarios.""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with mock.patch.object( + kubernetes_management_benchmark, '_CleanStartSweep' + ) as mock_clean, mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioA', return_value=[] + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioB', return_value=[] + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioC', return_value=[] + ): + kubernetes_management_benchmark.Run(bm_spec) + self.assertEqual(mock_clean.call_count, 2) + mock_clean.assert_called_with(cluster) + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['A'], + k8s_mgmt_scale_sweep=[], + k8s_mgmt_large_scale_nodepools=10, + ) + def testRunOnlyScenarioACallsOnlyA(self): + """Tests that Run only calls _RunScenarioA when scenarios=['A'].""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with mock.patch.object( + kubernetes_management_benchmark, '_CleanStartSweep' + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioA', return_value=[] + ) as mock_a, mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioB', return_value=[] + ) as mock_b, mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioC', return_value=[] + ) as mock_c: + kubernetes_management_benchmark.Run(bm_spec) + mock_a.assert_called_once() + mock_b.assert_not_called() + mock_c.assert_not_called() + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['B'], + k8s_mgmt_scale_sweep=[], + k8s_mgmt_large_scale_nodepools=10, + ) + def testRunOnlyScenarioBCallsOnlyB(self): + """Tests that Run only calls _RunScenarioB when scenarios=['B'].""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with mock.patch.object( + kubernetes_management_benchmark, '_CleanStartSweep' + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioA', return_value=[] + ) as mock_a, mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioB', return_value=[] + ) as mock_b, mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioC', return_value=[] + ) as mock_c: + kubernetes_management_benchmark.Run(bm_spec) + mock_a.assert_not_called() + mock_b.assert_called_once() + mock_c.assert_not_called() + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['C'], + k8s_mgmt_scale_sweep=[], + k8s_mgmt_large_scale_nodepools=42, + ) + def testRunScenarioCPassesLargeScaleFlag(self): + """Tests that Run passes the large-scale-nodepools flag to _RunScenarioC.""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with mock.patch.object( + kubernetes_management_benchmark, '_CleanStartSweep' + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioA', return_value=[] + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioB', return_value=[] + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioC', return_value=[] + ) as mock_c: + kubernetes_management_benchmark.Run(bm_spec) + mock_c.assert_called_once() + _, _, scale = mock_c.call_args.args + self.assertEqual(42, scale) + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['C'], + k8s_mgmt_scale_sweep=['10', '50'], + k8s_mgmt_large_scale_nodepools=100, + ) + def testRunScenarioCScaleSweepRunsTwice(self): + """Tests that Run calls _RunScenarioC once per scale in the sweep.""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with mock.patch.object( + kubernetes_management_benchmark, '_CleanStartSweep' + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioA', return_value=[] + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioB', return_value=[] + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunScenarioC', + return_value=[_make_sample('m', 1.0)], + ) as mock_c: + kubernetes_management_benchmark.Run(bm_spec) + self.assertEqual(2, mock_c.call_count) + scales = [call.args[2] for call in mock_c.call_args_list] + self.assertIn(10, scales) + self.assertIn(50, scales) + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['C'], + k8s_mgmt_scale_sweep=['10'], + k8s_mgmt_large_scale_nodepools=10, + ) + def testRunTagsScenarioCScaleInMetadata(self): + """Tests that Run adds scenario_c_scale to each sample's metadata.""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + test_sample = _make_sample('metric', 1.0) + with mock.patch.object( + kubernetes_management_benchmark, '_CleanStartSweep' + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioA', return_value=[] + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioB', return_value=[] + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunScenarioC', + return_value=[test_sample], + ): + samples = kubernetes_management_benchmark.Run(bm_spec) + self.assertIn('scenario_c_scale', samples[0].metadata) + self.assertEqual('10', samples[0].metadata['scenario_c_scale']) + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['A'], + k8s_mgmt_scale_sweep=[], + k8s_mgmt_large_scale_nodepools=10, + ) + def testRunTagsAllSamplesWithRunMetadata(self): + """Tests that Run adds version and config keys to all sample metadata.""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + test_sample = _make_sample('m', 1.0) + with mock.patch.object( + kubernetes_management_benchmark, '_CleanStartSweep' + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunScenarioA', + return_value=[test_sample], + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioB', return_value=[] + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioC', return_value=[] + ): + samples = kubernetes_management_benchmark.Run(bm_spec) + meta = samples[0].metadata + for key in ( + 'initial_version', + 'target_version', + 'cluster_k8s_version', + 'nodes_per_nodepool', + 'concurrent_nodepools', + ): + self.assertIn(key, meta) + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['A'], + k8s_mgmt_initial_version='1.30', + k8s_mgmt_target_version='1.31', + k8s_mgmt_scale_sweep=[], + k8s_mgmt_large_scale_nodepools=10, + ) + def testRunUsesExplicitVersionFlags(self): + """Tests that Run uses explicit version flags over auto-resolved ones.""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with mock.patch.object( + kubernetes_management_benchmark, '_CleanStartSweep' + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunScenarioA', + return_value=[_make_sample('m', 1.0)], + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioB', return_value=[] + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioC', return_value=[] + ): + samples = kubernetes_management_benchmark.Run(bm_spec) + cluster.ResolveNodePoolVersions.assert_not_called() + self.assertEqual('1.30', samples[0].metadata['initial_version']) + self.assertEqual('1.31', samples[0].metadata['target_version']) + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['A'], + k8s_mgmt_scale_sweep=[], + k8s_mgmt_large_scale_nodepools=10, + ) + def testRunAutoResolvesVersionsWhenFlagsAbsent(self): + """Tests Run calls ResolveNodePoolVersions when version flags absent.""" + cluster = _make_mock_cluster() + cluster.ResolveNodePoolVersions.return_value = ('1.33', '1.34') + bm_spec = _make_mock_benchmark_spec(cluster) + with mock.patch.object( + kubernetes_management_benchmark, '_CleanStartSweep' + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunScenarioA', + return_value=[_make_sample('m', 1.0)], + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioB', return_value=[] + ), mock.patch.object( + kubernetes_management_benchmark, '_RunScenarioC', return_value=[] + ): + samples = kubernetes_management_benchmark.Run(bm_spec) + cluster.ResolveNodePoolVersions.assert_called_once() + self.assertEqual('1.33', samples[0].metadata['initial_version']) + self.assertEqual('1.34', samples[0].metadata['target_version']) + + +class RunScenarioATest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _RunScenarioA phase-by-phase and pipelined modes.""" + + @flagsaver.flagsaver( + k8s_mgmt_concurrent_nodepools=2, + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + k8s_mgmt_pipeline_scenario_a=False, + ) + def testPhaseByPhaseProducesCreateUpgradeDeleteSamples(self): + """Tests Scenario A produces Create, Upgrade, and Delete samples.""" + cluster = _make_mock_cluster(pool_names=['pkbma000', 'pkbma001']) + samples = kubernetes_management_benchmark._RunScenarioA( + cluster, '1.33', '1.34' + ) + metrics = {s.metric for s in samples} + self.assertTrue(any('ScenarioA_Create' in m for m in metrics)) + self.assertTrue(any('ScenarioA_Upgrade' in m for m in metrics)) + self.assertTrue(any('ScenarioA_Delete' in m for m in metrics)) + + @flagsaver.flagsaver( + k8s_mgmt_concurrent_nodepools=2, + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + k8s_mgmt_pipeline_scenario_a=False, + ) + def testPhaseByPhasePassesInitialVersionToCreate(self): + """Tests _RunScenarioA passes initial_version to CreateNodePoolAsync.""" + cluster = _make_mock_cluster(pool_names=['pkbma000', 'pkbma001']) + kubernetes_management_benchmark._RunScenarioA(cluster, '1.33', '1.34') + for call in cluster.CreateNodePoolAsync.call_args_list: + kw = call.kwargs if call.kwargs else {} + pos = call.args + node_version = kw.get('node_version') or ( + pos[1] if len(pos) > 1 else None + ) + self.assertEqual('1.33', node_version) + + @flagsaver.flagsaver( + k8s_mgmt_concurrent_nodepools=2, + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + k8s_mgmt_pipeline_scenario_a=False, + ) + def testPhaseByPhaseDeleteUsesLivePoolList(self): + """Tests that _RunScenarioA deletes only the pools it finds at runtime.""" + cluster = _make_mock_cluster(pool_names=['pkbma000']) + kubernetes_management_benchmark._RunScenarioA(cluster, '1.33', '1.34') + self.assertEqual(1, cluster.DeleteNodePoolAsync.call_count) + + @flagsaver.flagsaver( + k8s_mgmt_concurrent_nodepools=2, + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + k8s_mgmt_pipeline_scenario_a=True, + ) + def testPipelinedModeActivatedByFlag(self): + """Tests pipelined mode is activated by the pipeline_scenario_a flag.""" + cluster = _make_mock_cluster(pool_names=[]) + samples = kubernetes_management_benchmark._RunScenarioA( + cluster, '1.33', '1.34' + ) + metrics = {s.metric for s in samples} + self.assertTrue(any('ScenarioA_Create' in m for m in metrics)) + self.assertTrue(any('ScenarioA_Upgrade' in m for m in metrics)) + self.assertTrue(any('ScenarioA_Delete' in m for m in metrics)) + + +class RunScenarioAPipelinedTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _RunScenarioAPipelined pipelined execution path.""" + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testPipelinedProducesAllThreePhases(self): + """Tests pipelined Scenario A produces Create/Upgrade/Delete samples.""" + cluster = _make_mock_cluster(pool_names=[]) + samples = kubernetes_management_benchmark._RunScenarioAPipelined( + cluster, n=2, initial='1.33', target='1.34' + ) + metrics = {s.metric for s in samples} + self.assertTrue(any('ScenarioA_Create' in m for m in metrics)) + self.assertTrue(any('ScenarioA_Upgrade' in m for m in metrics)) + self.assertTrue(any('ScenarioA_Delete' in m for m in metrics)) + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testPipelinedSkipsUpgradeAfterCreateFailure(self): + """Tests pipelined mode skips upgrade when create fails.""" + cluster = _make_mock_cluster(pool_names=[]) + cluster.CreateNodePoolAsync.side_effect = RuntimeError('create failed') + samples = kubernetes_management_benchmark._RunScenarioAPipelined( + cluster, n=1, initial='1.33', target='1.34' + ) + cluster.UpgradeNodePoolAsync.assert_not_called() + upgrade_rate = next( + (s for s in samples if s.metric == 'ScenarioA_Upgrade_SuccessRate'), + None, + ) + if upgrade_rate is not None: + self.assertEqual(0.0, upgrade_rate.value) + + +class RunScenarioBTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _RunScenarioB cluster-update + nodepool-create scenario.""" + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testProducesClusterUpdateAndNodePoolCreateSamples(self): + cluster = _make_mock_cluster(pool_names=[]) + samples = kubernetes_management_benchmark._RunScenarioB(cluster, '1.33') + metrics = {s.metric for s in samples} + self.assertTrue(any('ScenarioB_ClusterUpdate' in m for m in metrics)) + self.assertTrue(any('ScenarioB_NodePoolCreate' in m for m in metrics)) + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testDeletesTestPoolAfterRun(self): + cluster = _make_mock_cluster(pool_names=[]) + kubernetes_management_benchmark._RunScenarioB(cluster, '1.33') + cluster.DeleteNodePool.assert_called_once_with( + kubernetes_management_benchmark._SCENARIO_B_NAME + ) + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testDeleteFailureRaisesInScenarioB(self): + cluster = _make_mock_cluster(pool_names=[]) + cluster.DeleteNodePool.side_effect = RuntimeError('delete failed') + with self.assertRaises(RuntimeError): + kubernetes_management_benchmark._RunScenarioB(cluster, '1.33') + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testPassesInitialVersionToCreate(self): + """Tests _RunScenarioB passes initial_version to CreateNodePoolAsync.""" + cluster = _make_mock_cluster(pool_names=[]) + kubernetes_management_benchmark._RunScenarioB(cluster, '1.33') + for call in cluster.CreateNodePoolAsync.call_args_list: + kw = call.kwargs if call.kwargs else {} + pos = call.args + node_version = kw.get('node_version') or ( + pos[1] if len(pos) > 1 else None + ) + self.assertEqual('1.33', node_version) + + +class RunScenarioCTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _RunScenarioC large-scale create-and-delete scenario.""" + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testProducesCreateAndDeleteSamples(self): + cluster = _make_mock_cluster(pool_names=['pkbmc0000', 'pkbmc0001']) + samples = kubernetes_management_benchmark._RunScenarioC( + cluster, '1.33', scale=2 + ) + metrics = {s.metric for s in samples} + self.assertTrue(any('ScenarioC_Create' in m for m in metrics)) + self.assertTrue(any('ScenarioC_Delete' in m for m in metrics)) + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testZeroLivePoolsRecordsZeroDeleteSuccessRate(self): + """Tests Scenario C records 0% delete rate when no live pools exist.""" + cluster = _make_mock_cluster(pool_names=[]) + samples = kubernetes_management_benchmark._RunScenarioC( + cluster, '1.33', scale=3 + ) + delete_rate = next( + s for s in samples if s.metric == 'ScenarioC_Delete_SuccessRate' + ) + self.assertEqual(0.0, delete_rate.value) + cluster.DeleteNodePoolAsync.assert_not_called() + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testDeleteUsesLiveListNotOriginalCreateList(self): + cluster = _make_mock_cluster(pool_names=['pkbmc0000', 'pkbmc0001']) + kubernetes_management_benchmark._RunScenarioC(cluster, '1.33', scale=3) + self.assertEqual(2, cluster.DeleteNodePoolAsync.call_count) + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testCreateSuccessRateUsesScaleAsDenominator(self): + """Tests Scenario C create success rate uses scale as total_ops.""" + cluster = _make_mock_cluster(pool_names=['pkbmc0000']) + samples = kubernetes_management_benchmark._RunScenarioC( + cluster, '1.33', scale=3 + ) + create_rate = next( + s for s in samples if s.metric == 'ScenarioC_Create_SuccessRate' + ) + self.assertLessEqual(create_rate.value, 100.0) + self.assertEqual('3', create_rate.metadata['total_ops']) if __name__ == '__main__':