Skip to content

Commit 332139c

Browse files
committed
Merge branch 'LucaCanali:master' into issue-67
2 parents db40f38 + eab34df commit 332139c

2 files changed

Lines changed: 4 additions & 4 deletions

File tree

src/main/scala/ch/cern/sparkmeasure/StageMetrics.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,8 @@ case class StageMetrics(sparkSession: SparkSession) {
224224
// Legacy transformation of data recorded from the custom Stage listener
225225
// into a DataFrame and register it as a view for querying with SQL
226226
def createStageMetricsDF(nameTempView: String = "PerfStageMetrics"): DataFrame = {
227-
import sparkSession.implicits._
228-
val resultDF = listenerStage.stageMetricsData.toSeq.toDF()
227+
// Convert the ListBuffer of StageVals into a DataFrame
228+
val resultDF = sparkSession.createDataFrame(listenerStage.stageMetricsData.toSeq)
229229
resultDF.createOrReplaceTempView(nameTempView)
230230
logger.warn(s"Stage metrics data refreshed into temp view $nameTempView")
231231
resultDF

src/main/scala/ch/cern/sparkmeasure/TaskMetrics.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ case class TaskMetrics(sparkSession: SparkSession) {
131131
// Legacy transformation of data recorded from the custom Stage listener
132132
// into a DataFrame and register it as a view for querying with SQL
133133
def createTaskMetricsDF(nameTempView: String = "PerfTaskMetrics"): DataFrame = {
134-
import sparkSession.implicits._
135-
val resultDF = listenerTask.taskMetricsData.toSeq.toDF()
134+
// Create a DataFrame from the task metrics data
135+
val resultDF = sparkSession.createDataFrame(listenerTask.taskMetricsData.toSeq)
136136
resultDF.createOrReplaceTempView(nameTempView)
137137
logger.warn(s"Stage metrics data refreshed into temp view $nameTempView")
138138
resultDF

0 commit comments

Comments
 (0)