Skip to content

Commit

Permalink
Set kafka output topic as optional
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume LECLERC committed Apr 25, 2024
1 parent 1f6ad7b commit d457571
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.util.Try
*/
case class KafkaOutput(
brokers: String,
topic: String,
topic: Option[String],
trigger: Option[Trigger],
timeout: Long,
mode: String,
Expand All @@ -42,12 +42,17 @@ case class KafkaOutput(

val queryName = createQueryName()

var fullOptions = options + ("kafka.bootstrap.servers" -> brokers)

fullOptions = topic match {
case Some(kafkaTopic) => options + ("topic" -> kafkaTopic)
case _ => options
}

var streamWriter = data.writeStream
.queryName(queryName)
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("topic", topic)
.options(options)
.options(fullOptions)
.outputMode(mode)

streamWriter = trigger match {
Expand All @@ -68,9 +73,11 @@ case class KafkaOutput(
*/
private[streaming] def createQueryName(): String = {

outputName match {
case Some(name) => s"QN_${name}_${topic}_${java.util.UUID.randomUUID}"
case _ => s"QN_${topic}_${java.util.UUID.randomUUID}"
(outputName, topic) match {
case (Some(name), Some(kafkaTopic)) => s"QN_${name}_${kafkaTopic}_${java.util.UUID.randomUUID}"
case (Some(name), None) => s"QN_${name}_${java.util.UUID.randomUUID}"
case (None, Some(kafkaTopic)) => s"QN_${kafkaTopic}_${java.util.UUID.randomUUID}"
case _ => s"QN_KafkaOutput_${java.util.UUID.randomUUID}"
}

}
Expand All @@ -89,10 +96,7 @@ object KafkaOutput {
*/
def apply(implicit config: Config): KafkaOutput = {
val brokers = getBroker
val topic = getTopic match {
case Some(topicToUse) if topicToUse.nonEmpty => topicToUse
case _ => throw new IllegalArgumentException("No topic specified for Kafka source")
}
val topic = getTopic

val trigger = getStreamingTrigger

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {

kafkaStreamOutput.outputName shouldEqual Some("my-test-kafka-output")
kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000"
kafkaStreamOutput.topic shouldEqual "test.topic"
kafkaStreamOutput.topic shouldEqual Some("test.topic")
kafkaStreamOutput.trigger shouldEqual Some(Trigger.ProcessingTime(Duration("60 seconds")))
}

Expand Down Expand Up @@ -60,7 +60,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {

kafkaStreamOutput.outputName shouldEqual None
kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000"
kafkaStreamOutput.topic shouldEqual "test.topic"
kafkaStreamOutput.topic shouldEqual Some("test.topic")
kafkaStreamOutput.options shouldEqual Map(
"failOnDataLoss" -> "false",
"maxOffsetsPerTrigger" -> "20000000",
Expand Down Expand Up @@ -95,7 +95,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {

kafkaStreamOutput.outputName shouldEqual None
kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000"
kafkaStreamOutput.topic shouldEqual "test.topic"
kafkaStreamOutput.topic shouldEqual Some("test.topic")
kafkaStreamOutput.options shouldEqual Map(
"failOnDataLoss" -> "false",
"maxOffsetsPerTrigger" -> "20000000",
Expand Down Expand Up @@ -129,7 +129,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {

kafkaStreamOutput.outputName shouldEqual None
kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000"
kafkaStreamOutput.topic shouldEqual "test.topic"
kafkaStreamOutput.topic shouldEqual Some("test.topic")
kafkaStreamOutput.options shouldEqual Map(
"failOnDataLoss" -> "false",
"maxOffsetsPerTrigger" -> "20000000",
Expand All @@ -139,17 +139,14 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {
kafkaStreamOutput.trigger shouldEqual None
}

"throw an exception given missing topic but pattern" in {
"be initialized without topic" in {

val config = ConfigFactory.parseMap(
Map(
"Output" -> Map(
"Type" -> "com.amadeus.dataio.output.streaming.KafkaOutput",
"Name" -> "my-test-kafka",
"Brokers" -> "bktv001:9000, bktv002.amadeus.net:8000",
"Pattern" -> "test.pattern",
"Mode" -> "append",
"Trigger" -> "AvailableNow",
"Timeout" -> "24",
"Options" -> Map(
"failOnDataLoss" -> "false",
Expand All @@ -161,36 +158,18 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {
)
)

intercept[IllegalArgumentException] {
KafkaOutput(config.getConfig("Output"))
}
}

"throw an exception given missing topic but assign" in {
val kafkaStreamOutput = KafkaOutput(config.getConfig("Output"))

val config = ConfigFactory.parseMap(
Map(
"Output" -> Map(
"Type" -> "com.amadeus.dataio.output.streaming.KafkaOutput",
"Name" -> "my-test-kafka",
"Brokers" -> "bktv001:9000, bktv002.amadeus.net:8000",
"Assign" -> "test.assign",
"Mode" -> "append",
"Duration" -> "60 seconds",
"Timeout" -> "24",
"Options" -> Map(
"failOnDataLoss" -> "false",
"maxOffsetsPerTrigger" -> "20000000",
"\"kafka.security.protocol\"" -> "SASL_PLAINTEXT",
"\"kafka.sasl.kerberos.service.name\"" -> "kafka"
)
)
)
kafkaStreamOutput.outputName shouldEqual None
kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000"
kafkaStreamOutput.topic shouldEqual None
kafkaStreamOutput.options shouldEqual Map(
"failOnDataLoss" -> "false",
"maxOffsetsPerTrigger" -> "20000000",
"kafka.security.protocol" -> "SASL_PLAINTEXT",
"kafka.sasl.kerberos.service.name" -> "kafka"
)

intercept[IllegalArgumentException] {
KafkaOutput(config.getConfig("Output"))
}
kafkaStreamOutput.trigger shouldEqual None
}

"throw an exception given missing brokers" in {
Expand Down Expand Up @@ -229,7 +208,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {
val kafkaOutput =
KafkaOutput(
brokers = "bktv001:9000, bktv002.amadeus.net:8000",
topic = "test.topic",
topic = Some("test.topic"),
trigger = Some(Trigger.AvailableNow()),
timeout = 0,
mode = "append",
Expand All @@ -247,7 +226,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {
val kafkaOutput =
KafkaOutput(
brokers = "bktv001:9000, bktv002.amadeus.net:8000",
topic = "test.topic",
topic = Some("test.topic"),
trigger = Some(Trigger.AvailableNow()),
timeout = 0,
mode = "append",
Expand All @@ -259,5 +238,41 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {
queryName should fullyMatch regex "^QN_myTestOutput_test.topic_" + uuidPattern + "$"

}

"return a query name based on output name" in {

val kafkaOutput =
KafkaOutput(
brokers = "bktv001:9000, bktv002.amadeus.net:8000",
topic = None,
trigger = Some(Trigger.AvailableNow()),
timeout = 0,
mode = "append",
outputName = Some("myTestOutput")
)

val queryName = kafkaOutput.createQueryName()

queryName should fullyMatch regex "^QN_myTestOutput_" + uuidPattern + "$"

}

"return a query name based without name or topic" in {

val kafkaOutput =
KafkaOutput(
brokers = "bktv001:9000, bktv002.amadeus.net:8000",
topic = None,
trigger = Some(Trigger.AvailableNow()),
timeout = 0,
mode = "append",
outputName = None
)

val queryName = kafkaOutput.createQueryName()

queryName should fullyMatch regex "^QN_KafkaOutput_" + uuidPattern + "$"

}
}
}
2 changes: 1 addition & 1 deletion docs/content/advanced/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
title: Advanced
layout: default
has_children: true
nav_order: 6
nav_order: 7
---
# Advanced
2 changes: 1 addition & 1 deletion docs/content/configuration/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
title: Configuration
layout: default
has_children: true
nav_order: 5
nav_order: 6
---
# Configuration

Expand Down

0 comments on commit d457571

Please sign in to comment.