diff --git a/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala b/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala index 4303c7a..bf69f90 100644 --- a/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala +++ b/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala @@ -21,7 +21,7 @@ import scala.util.Try */ case class KafkaOutput( brokers: String, - topic: String, + topic: Option[String], trigger: Option[Trigger], timeout: Long, mode: String, @@ -42,12 +42,17 @@ case class KafkaOutput( val queryName = createQueryName() + var fullOptions = options + ("kafka.bootstrap.servers" -> brokers) + + fullOptions = topic match { + case Some(kafkaTopic) => options + ("topic" -> kafkaTopic) + case _ => options + } + var streamWriter = data.writeStream .queryName(queryName) .format("kafka") - .option("kafka.bootstrap.servers", brokers) - .option("topic", topic) - .options(options) + .options(fullOptions) .outputMode(mode) streamWriter = trigger match { @@ -68,9 +73,11 @@ case class KafkaOutput( */ private[streaming] def createQueryName(): String = { - outputName match { - case Some(name) => s"QN_${name}_${topic}_${java.util.UUID.randomUUID}" - case _ => s"QN_${topic}_${java.util.UUID.randomUUID}" + (outputName, topic) match { + case (Some(name), Some(kafkaTopic)) => s"QN_${name}_${kafkaTopic}_${java.util.UUID.randomUUID}" + case (Some(name), None) => s"QN_${name}_${java.util.UUID.randomUUID}" + case (None, Some(kafkaTopic)) => s"QN_${kafkaTopic}_${java.util.UUID.randomUUID}" + case _ => s"QN_KafkaOutput_${java.util.UUID.randomUUID}" } } @@ -89,10 +96,7 @@ object KafkaOutput { */ def apply(implicit config: Config): KafkaOutput = { val brokers = getBroker - val topic = getTopic match { - case Some(topicToUse) if topicToUse.nonEmpty => topicToUse - case _ => throw new IllegalArgumentException("No topic specified for Kafka source") - } + val topic = getTopic val trigger = getStreamingTrigger diff --git a/core/src/test/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutputTest.scala b/core/src/test/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutputTest.scala index 1dd87e6..3f9f8d1 100644 --- a/core/src/test/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutputTest.scala +++ b/core/src/test/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutputTest.scala @@ -30,7 +30,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { kafkaStreamOutput.outputName shouldEqual Some("my-test-kafka-output") kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000" - kafkaStreamOutput.topic shouldEqual "test.topic" + kafkaStreamOutput.topic shouldEqual Some("test.topic") kafkaStreamOutput.trigger shouldEqual Some(Trigger.ProcessingTime(Duration("60 seconds"))) } @@ -60,7 +60,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { kafkaStreamOutput.outputName shouldEqual None kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000" - kafkaStreamOutput.topic shouldEqual "test.topic" + kafkaStreamOutput.topic shouldEqual Some("test.topic") kafkaStreamOutput.options shouldEqual Map( "failOnDataLoss" -> "false", "maxOffsetsPerTrigger" -> "20000000", @@ -95,7 +95,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { kafkaStreamOutput.outputName shouldEqual None kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000" - kafkaStreamOutput.topic shouldEqual "test.topic" + kafkaStreamOutput.topic shouldEqual Some("test.topic") kafkaStreamOutput.options shouldEqual Map( "failOnDataLoss" -> "false", "maxOffsetsPerTrigger" -> "20000000", @@ -129,7 +129,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { kafkaStreamOutput.outputName shouldEqual None kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000" - kafkaStreamOutput.topic shouldEqual "test.topic" + kafkaStreamOutput.topic shouldEqual Some("test.topic") kafkaStreamOutput.options shouldEqual Map( "failOnDataLoss" -> "false", "maxOffsetsPerTrigger" -> "20000000", @@ -139,17 +139,14 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { kafkaStreamOutput.trigger shouldEqual None } - "throw an exception given missing topic but pattern" in { + "be initialized without topic" in { val config = ConfigFactory.parseMap( Map( "Output" -> Map( "Type" -> "com.amadeus.dataio.output.streaming.KafkaOutput", - "Name" -> "my-test-kafka", "Brokers" -> "bktv001:9000, bktv002.amadeus.net:8000", - "Pattern" -> "test.pattern", "Mode" -> "append", - "Trigger" -> "AvailableNow", "Timeout" -> "24", "Options" -> Map( "failOnDataLoss" -> "false", @@ -161,36 +158,18 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { ) ) - intercept[IllegalArgumentException] { - KafkaOutput(config.getConfig("Output")) - } - } - - "throw an exception given missing topic but assign" in { + val kafkaStreamOutput = KafkaOutput(config.getConfig("Output")) - val config = ConfigFactory.parseMap( - Map( - "Output" -> Map( - "Type" -> "com.amadeus.dataio.output.streaming.KafkaOutput", - "Name" -> "my-test-kafka", - "Brokers" -> "bktv001:9000, bktv002.amadeus.net:8000", - "Assign" -> "test.assign", - "Mode" -> "append", - "Duration" -> "60 seconds", - "Timeout" -> "24", - "Options" -> Map( - "failOnDataLoss" -> "false", - "maxOffsetsPerTrigger" -> "20000000", - "\"kafka.security.protocol\"" -> "SASL_PLAINTEXT", - "\"kafka.sasl.kerberos.service.name\"" -> "kafka" - ) - ) - ) + kafkaStreamOutput.outputName shouldEqual None + kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000" + kafkaStreamOutput.topic shouldEqual None + kafkaStreamOutput.options shouldEqual Map( + "failOnDataLoss" -> "false", + "maxOffsetsPerTrigger" -> "20000000", + "kafka.security.protocol" -> "SASL_PLAINTEXT", + "kafka.sasl.kerberos.service.name" -> "kafka" ) - - intercept[IllegalArgumentException] { - KafkaOutput(config.getConfig("Output")) - } + kafkaStreamOutput.trigger shouldEqual None } "throw an exception given missing brokers" in { @@ -229,7 +208,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { val kafkaOutput = KafkaOutput( brokers = "bktv001:9000, bktv002.amadeus.net:8000", - topic = "test.topic", + topic = Some("test.topic"), trigger = Some(Trigger.AvailableNow()), timeout = 0, mode = "append", @@ -247,7 +226,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { val kafkaOutput = KafkaOutput( brokers = "bktv001:9000, bktv002.amadeus.net:8000", - topic = "test.topic", + topic = Some("test.topic"), trigger = Some(Trigger.AvailableNow()), timeout = 0, mode = "append", @@ -259,5 +238,41 @@ class KafkaOutputTest extends AnyWordSpec with Matchers { queryName should fullyMatch regex "^QN_myTestOutput_test.topic_" + uuidPattern + "$" } + + "return a query name based on output name" in { + + val kafkaOutput = + KafkaOutput( + brokers = "bktv001:9000, bktv002.amadeus.net:8000", + topic = None, + trigger = Some(Trigger.AvailableNow()), + timeout = 0, + mode = "append", + outputName = Some("myTestOutput") + ) + + val queryName = kafkaOutput.createQueryName() + + queryName should fullyMatch regex "^QN_myTestOutput_" + uuidPattern + "$" + + } + + "return a query name based without name or topic" in { + + val kafkaOutput = + KafkaOutput( + brokers = "bktv001:9000, bktv002.amadeus.net:8000", + topic = None, + trigger = Some(Trigger.AvailableNow()), + timeout = 0, + mode = "append", + outputName = None + ) + + val queryName = kafkaOutput.createQueryName() + + queryName should fullyMatch regex "^QN_KafkaOutput_" + uuidPattern + "$" + + } } } diff --git a/docs/content/advanced/advanced.md b/docs/content/advanced/advanced.md index 598d55e..0aebf24 100644 --- a/docs/content/advanced/advanced.md +++ b/docs/content/advanced/advanced.md @@ -2,6 +2,6 @@ title: Advanced layout: default has_children: true -nav_order: 6 +nav_order: 7 --- # Advanced diff --git a/docs/content/configuration/configuration.md b/docs/content/configuration/configuration.md index f95e33c..0dae97f 100644 --- a/docs/content/configuration/configuration.md +++ b/docs/content/configuration/configuration.md @@ -2,7 +2,7 @@ title: Configuration layout: default has_children: true -nav_order: 5 +nav_order: 6 --- # Configuration