|
6 | 6 | from .jmx import jmxexport |
7 | 7 | from .testutils import setup_sparksession |
8 | 8 |
|
| 9 | + |
| 10 | +LINE_END = r"(?:\r?\n|$)" |
| 11 | +NUM = r"[0-9]+(?:\.[0-9]+)?" # integer or float |
| 12 | + |
| 13 | + |
| 14 | +def _must_match(pattern: str, text: str, msg: str): |
| 15 | + """Assert that regex pattern matches text; show diagnostic output on failure.""" |
| 16 | + m = re.search(pattern, text, re.MULTILINE | re.DOTALL) |
| 17 | + assert m, f"{msg}\n--- Pattern ---\n{pattern}\n--- Output ---\n{text}" |
| 18 | + return m |
| 19 | + |
| 20 | + |
9 | 21 | def test_jmxexport(setup_sparksession): |
10 | 22 | spark = setup_sparksession |
| 23 | + |
| 24 | + # Keep small, deterministic-ish plan for CI |
11 | 25 | conf = spark.sparkContext.getConf() |
12 | 26 | conf.set("spark.sql.shuffle.partitions", "2") |
13 | 27 | conf.set("spark.default.parallelism", "2") |
14 | | - stagemetrics = StageMetrics(spark) |
15 | 28 |
|
| 29 | + stagemetrics = StageMetrics(spark) |
16 | 30 | stagemetrics.begin() |
17 | | - spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show() |
| 31 | + # Non-trivial job to produce meaningful metrics |
| 32 | + spark.sql( |
| 33 | + "select count(*) from range(1000) cross join range(1000) cross join range(1000)" |
| 34 | + ).show() |
18 | 35 | stagemetrics.end() |
19 | 36 |
|
20 | | - # print report to standard output |
| 37 | + # Report (useful in CI logs) |
21 | 38 | stagemetrics.print_report() |
22 | 39 |
|
23 | | - # get metrics data as a dictionary |
| 40 | + # Basic sanity on aggregated metrics |
24 | 41 | metrics = stagemetrics.aggregate_stagemetrics() |
25 | | - print(f"metrics elapsedTime = {metrics.get('elapsedTime')}") |
26 | | - assert metrics.get('elapsedTime') > 0 |
| 42 | + assert metrics.get("elapsedTime", 0) > 0, "elapsedTime should be > 0" |
27 | 43 |
|
| 44 | + # Export via Dropwizard/JMX and capture string description |
28 | 45 | jmxexport(spark, metrics) |
29 | | - |
30 | 46 | dropwizard = spark._jvm.ch.cern.metrics.DropwizardMetrics |
| 47 | + output = dropwizard.describeMetrics() |
| 48 | + print(output) # leave for CI diagnostics |
31 | 49 |
|
32 | | - # Regex pour matcher la sortie attendue avec une étoile pour resultSize |
33 | | - expected_regex = ( |
34 | | - r"Metrics for namespace 'unknown' and pod 'unknown'\n" |
35 | | - r"Total gauges: 2\n" |
36 | | - r"Gauge: unknown\.unknown\.resultSize = [0-9]+(\.[0-9]+)?\n" |
37 | | - r"Gauge: unknown\.unknown\.peakExecutionMemory = 0\.0\n" |
38 | | - r"Total counters: 19\n" |
39 | | - r"Counter: unknown\.unknown\.bytesRead = 0\n" |
40 | | - r"Counter: unknown\.unknown\.numStages = 3\n" |
41 | | - r"Counter: unknown\.unknown\.shuffleRecordsWritten = [0-9]+\n" |
42 | | - r"Counter: unknown\.unknown\.shuffleRemoteBytesRead = 0\n" |
43 | | - r"Counter: unknown\.unknown\.shuffleLocalBlocksFetched = [0-9]+\n" |
44 | | - r"Counter: unknown\.unknown\.shuffleTotalBlocksFetched = [0-9]+\n" |
45 | | - r"Counter: unknown\.unknown\.memoryBytesSpilled = 0\n" |
46 | | - r"Counter: unknown\.unknown\.bytesWritten = 0\n" |
47 | | - r"Counter: unknown\.unknown\.numTasks = [0-9]+\n" |
48 | | - r"Counter: unknown\.unknown\.recordsWritten = 0\n" |
49 | | - r"Counter: unknown\.unknown\.shuffleRecordsRead = [0-9]+\n" |
50 | | - r"Counter: unknown\.unknown\.recordsRead = 2000\n" |
51 | | - r"Counter: unknown\.unknown\.shuffleLocalBytesRead = [0-9]+\n" |
52 | | - r"Counter: unknown\.unknown\.shuffleBytesWritten = [0-9]+\n" |
53 | | - r"Counter: unknown\.unknown\.shuffleTotalBytesRead = [0-9]+\n" |
54 | | - r"Counter: unknown\.unknown\.metrics_published_total = 30\n" |
55 | | - r"Counter: unknown\.unknown\.diskBytesSpilled = 0\n" |
56 | | - r"Counter: unknown\.unknown\.shuffleRemoteBytesReadToDisk = 0\n" |
57 | | - r"Counter: unknown\.unknown\.shuffleRemoteBlocksFetched = 0\n" |
58 | | - r"Total timers: 10\n" |
59 | | - r"Timer: unknown.unknown.shuffleWriteTime = count=1\n" |
60 | | - r"Timer: unknown.unknown.stageDuration = count=1\n" |
61 | | - r"Timer: unknown.unknown.executorCpuTime = count=1\n" |
62 | | - r"Timer: unknown.unknown.shuffleFetchWaitTime = count=1\n" |
63 | | - r"Timer: unknown.unknown.executorRunTime = count=1\n" |
64 | | - r"Timer: unknown.unknown.jvmGCTime = count=1\n" |
65 | | - r"Timer: unknown.unknown.elapsedTime = count=1\n" |
66 | | - r"Timer: unknown.unknown.executorDeserializeCpuTime = count=1\n" |
67 | | - r"Timer: unknown.unknown.resultSerializationTime = count=1\n" |
68 | | - r"Timer: unknown.unknown.executorDeserializeTime = count=1\n" |
| 50 | + # 1) Header: capture namespace/pod (can be 'unknown' or a real name) |
| 51 | + header_pat = ( |
| 52 | + r"Metrics for namespace '([A-Za-z0-9._-]+)' and pod '([A-Za-z0-9._-]+)'" + LINE_END |
69 | 53 | ) |
| 54 | + m = _must_match(header_pat, output, "Missing or malformed header with namespace/pod") |
| 55 | + ns, pod = m.group(1), m.group(2) |
70 | 56 |
|
71 | | - output = dropwizard.describeMetrics() |
72 | | - print(output) |
73 | | - assert re.search(expected_regex, output, re.MULTILINE | re.DOTALL), f"Output does not match expected regex.\nGot:\n{output}\nWant:\n{expected_regex}" |
| 57 | + # 2) Totals present (counts may vary) |
| 58 | + _must_match(r"Total gauges: \d+" + LINE_END, output, "Missing 'Total gauges' line") |
| 59 | + _must_match(r"Total counters: \d+" + LINE_END, output, "Missing 'Total counters' line") |
| 60 | + _must_match(r"Total timers: \d+" + LINE_END, output, "Missing 'Total timers' line") |
| 61 | + |
| 62 | + # 3) Gauges: order may vary, so assert independently |
| 63 | + gauge_prefix = rf"Gauge: {re.escape(ns)}\.{re.escape(pod)}\." |
| 64 | + _must_match( |
| 65 | + gauge_prefix + rf"resultSize = {NUM}" + LINE_END, |
| 66 | + output, |
| 67 | + "Missing 'resultSize' gauge", |
| 68 | + ) |
| 69 | + _must_match( |
| 70 | + gauge_prefix + rf"peakExecutionMemory = {NUM}" + LINE_END, |
| 71 | + output, |
| 72 | + "Missing 'peakExecutionMemory' gauge", |
| 73 | + ) |
| 74 | + |
| 75 | + # 4) Counters: assert presence (values vary by env) |
| 76 | + counters = [ |
| 77 | + "bytesRead", |
| 78 | + "bytesWritten", |
| 79 | + "numStages", |
| 80 | + "numTasks", |
| 81 | + "recordsRead", |
| 82 | + "recordsWritten", |
| 83 | + "shuffleRecordsRead", |
| 84 | + "shuffleRecordsWritten", |
| 85 | + "shuffleLocalBlocksFetched", |
| 86 | + "shuffleTotalBlocksFetched", |
| 87 | + "shuffleLocalBytesRead", |
| 88 | + "shuffleBytesWritten", |
| 89 | + "shuffleTotalBytesRead", |
| 90 | + "memoryBytesSpilled", |
| 91 | + "diskBytesSpilled", |
| 92 | + "shuffleRemoteBlocksFetched", |
| 93 | + "shuffleRemoteBytesRead", |
| 94 | + "shuffleRemoteBytesReadToDisk", |
| 95 | + "metrics_published_total", |
| 96 | + ] |
| 97 | + for cname in counters: |
| 98 | + _must_match( |
| 99 | + rf"Counter: {re.escape(ns)}\.{re.escape(pod)}\.{re.escape(cname)} = \d+" |
| 100 | + + LINE_END, |
| 101 | + output, |
| 102 | + f"Missing counter '{cname}'", |
| 103 | + ) |
| 104 | + |
| 105 | + # 5) Timers: ensure core timers exist; counts may vary |
| 106 | + timers = [ |
| 107 | + "elapsedTime", |
| 108 | + "stageDuration", |
| 109 | + "executorRunTime", |
| 110 | + "executorCpuTime", |
| 111 | + "executorDeserializeTime", |
| 112 | + "executorDeserializeCpuTime", |
| 113 | + "resultSerializationTime", |
| 114 | + "jvmGCTime", |
| 115 | + "shuffleWriteTime", |
| 116 | + "shuffleFetchWaitTime", |
| 117 | + ] |
| 118 | + for tname in timers: |
| 119 | + _must_match( |
| 120 | + rf"Timer: {re.escape(ns)}\.{re.escape(pod)}\.{re.escape(tname)} = count=\d+" |
| 121 | + + LINE_END, |
| 122 | + output, |
| 123 | + f"Missing timer '{tname}'", |
| 124 | + ) |
74 | 125 |
|
75 | 126 | spark.stop() |
| 127 | + |
0 commit comments