Skip to content

Commit 53feb16

Browse files
committed
Minor fixes to InfluxDB and Kafka sinks
1 parent f20c037 commit 53feb16

2 files changed

Lines changed: 14 additions & 10 deletions

File tree

src/main/scala/ch/cern/sparkmeasure/InfluxDBSink.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package ch.cern.sparkmeasure
22

3-
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerEvent, SparkListenerExecutorAdded, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd, SparkListenerTaskStart, TaskLocality}
3+
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerEvent, SparkListenerExecutorAdded, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd, SparkListenerTaskStart}
44
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
55
import org.apache.spark.sql.SparkSession
66
import org.apache.spark.SparkConf

src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -221,15 +221,19 @@ class KafkaSink(conf: SparkConf) extends SparkListener {
221221
)
222222
}
223223

224-
225-
protected def report[T <: Any](metrics: Map[String, T]): Unit = Try {
226-
ensureProducer()
227-
228-
val str = IOUtils.writeToStringSerializedJSON(metrics)
229-
val message = str.getBytes(StandardCharsets.UTF_8)
230-
producer.send(new ProducerRecord[String, Array[Byte]](topic, message))
231-
}.recover {
232-
case ex: Throwable => logger.error(s"error on reporting metrics to kafka stream, details=${ex.getMessage}", ex)
224+
protected def report[T <: Any](metrics: Map[String, T]): Unit = {
225+
val result: Unit = Try {
226+
ensureProducer()
227+
val str = IOUtils.writeToStringSerializedJSON(metrics)
228+
val message = str.getBytes(StandardCharsets.UTF_8)
229+
producer.send(new ProducerRecord[String, Array[Byte]](topic, message))
230+
}.recover {
231+
case ex: Throwable =>
232+
logger.error(s"Error on reporting metrics to Kafka stream, details=${ex.getMessage}", ex)
233+
() // Explicitly return Unit
234+
}.getOrElse(()) // Ensure Unit is the final type
235+
236+
result // Return Unit
233237
}
234238

235239
private def ensureProducer(): Unit = {

0 commit comments

Comments
 (0)