diff --git a/docs/Flight_recorder_mode_KafkaSink.md b/docs/Flight_recorder_mode_KafkaSink.md index 5252307..3105d10 100644 --- a/docs/Flight_recorder_mode_KafkaSink.md +++ b/docs/Flight_recorder_mode_KafkaSink.md @@ -36,6 +36,8 @@ Configuration - KafkaSink parameters: --conf spark.sparkmeasure.kafkaTopic = Kafka topic Example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-stageinfo Note: the topic will be created if it does not yet exist +--conf spark.sparkmeasure.kafka.* = Other kafka properties + Example: --conf spark.sparkmeasure.kafka.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ``` This code depends on "kafka-clients". If you deploy sparkMeasure from maven central, diff --git a/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala b/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala index 5a2c807..02208d4 100644 --- a/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala +++ b/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala @@ -27,6 +27,8 @@ import scala.util.Try * example: --conf spark.sparkmeasure.kafkaBroker=kafka.your-site.com:9092 * spark.sparkmeasure.kafkaTopic = Kafka topic * example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-stageinfo + * spark.sparkmeasure.kafka.* = Other kafka properties + * example: --conf spark.sparkmeasure.kafka.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks * * This code depends on "kafka clients", you may need to add the dependency: * --packages org.apache.kafka:kafka-clients:3.2.1 @@ -39,7 +41,7 @@ class KafkaSink(conf: SparkConf) extends SparkListener { logger.warn("Custom monitoring listener with Kafka sink initializing. Now attempting to connect to Kafka topic") // Initialize Kafka connection - val (broker, topic) = Utils.parseKafkaConfig(conf, logger) + val (broker, topic, properties) = Utils.parseKafkaConfig(conf, logger) private var producer: Producer[String, Array[Byte]] = _ var appId: String = SparkSession.getActiveSession match { @@ -248,6 +250,7 @@ class KafkaSink(conf: SparkConf) extends SparkListener { props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", classOf[ByteArraySerializer].getName) props.put("client.id", "spark-measure") + properties.foreach{ case (k, v) => props.put(k, v) } producer = new KafkaProducer(props) } ) @@ -343,4 +346,4 @@ class KafkaSinkExtended(conf: SparkConf) extends KafkaSink(conf) { ) report(point2) } -} \ No newline at end of file +} diff --git a/src/main/scala/ch/cern/sparkmeasure/Utils.scala b/src/main/scala/ch/cern/sparkmeasure/Utils.scala index e5ff6fa..eba8898 100644 --- a/src/main/scala/ch/cern/sparkmeasure/Utils.scala +++ b/src/main/scala/ch/cern/sparkmeasure/Utils.scala @@ -245,7 +245,7 @@ object Utils { influxdbStagemetrics } - def parseKafkaConfig(conf: SparkConf, logger: Logger): (String, String) = { + def parseKafkaConfig(conf: SparkConf, logger: Logger): (String, String, Map[String, String]) = { // handle Kafka broker and topic val broker = conf.get("spark.sparkmeasure.kafkaBroker", "") val topic = conf.get("spark.sparkmeasure.kafkaTopic", "") @@ -255,7 +255,15 @@ object Utils { logger.info(s"Kafka broker: $broker") logger.info(s"Kafka topic: $topic") } - (broker, topic) + + val prefix = "spark.sparkmeasure.kafka." + val kafkaParams: Map[String, String] = conf.getAll + .collect { + case (k, v) if k.startsWith(prefix) => k.stripPrefix(prefix) -> v + } + .toMap + + (broker, topic, kafkaParams) } def parsePushGatewayConfig(conf: SparkConf, logger: Logger): PushgatewayConfig = {