-
Notifications
You must be signed in to change notification settings - Fork 31
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Topic enforcer to generate Strimzi KafkaTopic resources.
- Loading branch information
Showing
4 changed files
with
116 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
95 changes: 95 additions & 0 deletions
95
kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/StrimziCommand.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
/* | ||
* Copyright (C) 2024 Tesla Motors, Inc. All rights reserved. | ||
*/ | ||
|
||
package com.tesla.data.topic.enforcer; | ||
|
||
import com.tesla.data.enforcer.BaseCommand; | ||
|
||
import com.beust.jcommander.Parameter; | ||
import com.beust.jcommander.Parameters; | ||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; | ||
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; | ||
import com.google.common.base.CaseFormat; | ||
import com.google.common.hash.Hashing; | ||
import io.strimzi.api.kafka.model.topic.KafkaTopic; | ||
import io.strimzi.api.kafka.model.topic.KafkaTopicBuilder; | ||
|
||
import java.nio.charset.StandardCharsets; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
@Parameters(commandDescription = "Generate the Strimzi KafkaTopic resources YAML on stdout from the topic config. " + | ||
"Certain information such as topic tags and config comments would be missed.") | ||
public class StrimziCommand extends BaseCommand<ConfiguredTopic> { | ||
|
||
@Parameter( | ||
names = {"--kafka_name", "-k"}, | ||
description = "the name of the Strimzi Kafka cluster to be generated for", | ||
required = true) | ||
protected String kafkaName; | ||
|
||
@Override | ||
public int run() { | ||
List<ConfiguredTopic> configuredTopics = configuredEntities(ConfiguredTopic.class, "topics", "topicsFile"); | ||
|
||
ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory() | ||
.configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, true) | ||
.configure(YAMLGenerator.Feature.LITERAL_BLOCK_STYLE, true) | ||
.configure(YAMLGenerator.Feature.MINIMIZE_QUOTES, true) | ||
.configure(YAMLGenerator.Feature.INDENT_ARRAYS, false) | ||
.configure(YAMLGenerator.Feature.SPLIT_LINES, false) | ||
); | ||
|
||
try { | ||
for (ConfiguredTopic topic : configuredTopics) { | ||
System.out.println(yamlMapper.writeValueAsString(getKafkaTopic(topic))); | ||
} | ||
} catch (JsonProcessingException e) { | ||
LOG.error("Failed to dump config", e); | ||
return FAILURE; | ||
} | ||
|
||
return SUCCESS; | ||
} | ||
|
||
/** | ||
* Takes the best effort to convert the topic names to a resource name compatible format. We are not aiming at perfect | ||
* correctness for mixed cases. Instead, we take some heuristics to get some good enough results. | ||
*/ | ||
private String toResourceName(String s) { | ||
// If the topic name contains an underscore, we assume it follows underscore case. | ||
if (s.contains("_")) { | ||
return CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_HYPHEN, s.toLowerCase()).replaceAll("[^-.a-z0-9]",""); | ||
} | ||
// We assume the original name is upper camel case and take the best effort, even if it's not. | ||
return CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_HYPHEN, s).replaceAll("[^-.a-z0-9]",""); | ||
} | ||
|
||
private KafkaTopic getKafkaTopic(ConfiguredTopic enforcerTopic) { | ||
// Kubernetes resources follow RFC 1123 naming convention, so we have to remove the unsupported characters. To avoid | ||
// duplications, we are appending a short hash string of the original topic name. | ||
String topicHash = Hashing.sha256().hashString(enforcerTopic.getName(), StandardCharsets.UTF_8) | ||
.toString().substring(0, 6); | ||
// There could be some Kafka topics under the same names but for different Kafka clusters. To avoid ambiguity, | ||
// we prepend the Kafka cluster names in the KafkaTopic metadata name. | ||
String resourceName = kafkaName + "." + toResourceName(enforcerTopic.getName()) + "-" + topicHash; | ||
|
||
return new KafkaTopicBuilder() | ||
.withNewMetadata() | ||
.withName(resourceName) | ||
.withLabels(Map.of("strimzi.io/cluster", kafkaName)) | ||
.endMetadata() | ||
.withNewSpec() | ||
.withTopicName(enforcerTopic.getName()) | ||
.withPartitions(enforcerTopic.getPartitions()) | ||
.withReplicas((int) enforcerTopic.getReplicationFactor()) | ||
.withConfig(new HashMap<>(enforcerTopic.getConfig())) | ||
.endSpec().build(); | ||
} | ||
} | ||
|
||
|