Skip to content

Commit 7a00f66

Browse files
committed
Minor fixes to StageMetris and TaskMetrics
1 parent 53feb16 commit 7a00f66

3 files changed

Lines changed: 11 additions & 9 deletions

File tree

build.sbt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,6 @@ licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0"))
1111
publishMavenStyle := true
1212
isSnapshot := true
1313

14-
// deprecations
15-
// scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlint")
16-
1714
val testDeps = Seq(
1815
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
1916
"org.scalatest" %% "scalatest-shouldmatchers" % "3.2.19" % Test,

src/main/scala/ch/cern/sparkmeasure/StageMetrics.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package ch.cern.sparkmeasure
33
import org.apache.spark.sql.{DataFrame, SparkSession}
44
import org.slf4j.LoggerFactory
55

6-
import scala.collection.JavaConverters.mapAsJavaMap
76
import scala.collection.mutable.{LinkedHashMap, ListBuffer}
7+
import scala.jdk.CollectionConverters._
88
import scala.math.{max, min}
99

1010
/**
@@ -109,7 +109,7 @@ case class StageMetrics(sparkSession: SparkSession) {
109109

110110
// Transforms aggregateStageMetrics output in a Java Map, needed by the Python API
111111
def aggregateStageMetricsJavaMap(): java.util.Map[String, Long] = {
112-
mapAsJavaMap(aggregateStageMetrics())
112+
aggregateStageMetrics().asJava
113113
}
114114

115115
// Extracts stages and their duration
@@ -167,7 +167,7 @@ case class StageMetrics(sparkSession: SparkSession) {
167167
def reportMemory(): String = {
168168
import scala.collection.mutable.ListBuffer
169169

170-
var result = ListBuffer[String]()
170+
val result = ListBuffer[String]()
171171
val stages = listenerStage.stageMetricsData.map(_.stageId).sorted
172172

173173
// Append detailed stage-level executor metrics,
@@ -177,7 +177,12 @@ case class StageMetrics(sparkSession: SparkSession) {
177177
stages.foreach { stageId =>
178178
executorMetricsNames.foreach { metric =>
179179
try {
180-
val stageExecutorMetricsRaw = listenerStage.stageIdtoExecutorMetrics(stageId, metric)
180+
val stageExecutorMetricsRaw = listenerStage.stageIdtoExecutorMetrics.get((stageId, metric)) match {
181+
case Some(metrics) => metrics
182+
case None =>
183+
logger.warn(s"No executor metrics found for stage $stageId and metric $metric.")
184+
ListBuffer.empty[(String, Long)]
185+
}
181186

182187
if (stageExecutorMetricsRaw.isEmpty) {
183188
// Handle gracefully if no data was returned

src/main/scala/ch/cern/sparkmeasure/TaskMetrics.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package ch.cern.sparkmeasure
33
import org.apache.spark.sql.{DataFrame, SparkSession}
44
import org.slf4j.LoggerFactory
55

6-
import scala.collection.JavaConverters.mapAsJavaMap
76
import scala.collection.mutable.{LinkedHashMap, ListBuffer}
7+
import scala.jdk.CollectionConverters._
88
import scala.math.max
99

1010
/**
@@ -105,7 +105,7 @@ case class TaskMetrics(sparkSession: SparkSession) {
105105

106106
// Transforms aggregateTaskMetrics output in a Java Map, needed by the Python API
107107
def aggregateTaskMetricsJavaMap(): java.util.Map[String, Long] = {
108-
mapAsJavaMap(aggregateTaskMetrics())
108+
aggregateTaskMetrics().asJava
109109
}
110110

111111
// Custom aggregations and post-processing of metrics data

0 commit comments

Comments
 (0)