Skip to content

Commit

Permalink
[GOBBLIN-1876] Kafka source / extractor utility to get a simple name …
Browse files Browse the repository at this point in the history
…for kafka brokers (#3738)

* [GOBBLIN-1876] Kafka source / extractor utility to get a simple name for kafka brokers

* Configuration key was not backward compatible

* Address comments
  • Loading branch information
homatthew authored Aug 15, 2023
1 parent 7026d93 commit 69d7e0f
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,7 @@ public class ConfigurationKeys {
* Kafka job configurations.
*/
public static final String KAFKA_BROKERS = "kafka.brokers";
public static final String KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY = "kafka.brokersToSimpleNameMap";
public static final String KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS = "kafka.source.work.units.creation.threads";
public static final int KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT = 30;
public static final String KAFKA_SOURCE_SHARE_CONSUMER_CLIENT = "kafka.source.shareConsumerClient";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.gobblin;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -25,9 +27,18 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;

import org.apache.gobblin.configuration.State;

import static org.apache.gobblin.configuration.ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY;


public class KafkaCommonUtil {
public static final long KAFKA_FLUSH_TIMEOUT_SECONDS = 15L;
public static final String MAP_KEY_VALUE_DELIMITER_KEY = "->";
public static final Splitter LIST_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings();

public static void runWithTimeout(final Runnable runnable, long timeout, TimeUnit timeUnit) throws Exception {
runWithTimeout(() -> {
Expand Down Expand Up @@ -59,4 +70,17 @@ public static <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit
}
}
}

public static Map<String, String> getKafkaBrokerToSimpleNameMap(State state) {
Preconditions.checkArgument(state.contains(KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY),
String.format("Configuration must contain value for %s", KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY));
String mapStr = state.getProp(KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY);
Map<String, String> kafkaBrokerUriToSimpleName = new HashMap<>();
for (String entry : LIST_SPLITTER.splitToList(mapStr)) {
String[] items = entry.trim().split(MAP_KEY_VALUE_DELIMITER_KEY);
kafkaBrokerUriToSimpleName.put(items[0], items[1]);
}

return kafkaBrokerUriToSimpleName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import lombok.Getter;

import org.apache.gobblin.KafkaCommonUtil;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
Expand All @@ -49,6 +50,8 @@
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;

import static org.apache.gobblin.configuration.ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY;


/**
* An implementation of {@link Extractor} for Apache Kafka. Each {@link KafkaExtractor} processes
Expand Down Expand Up @@ -337,4 +340,24 @@ public void close() throws IOException {
public long getHighWatermark() {
return 0;
}

public static String getKafkaBrokerSimpleName(State state) {
Preconditions.checkArgument(state.contains(ConfigurationKeys.KAFKA_BROKERS), String.format("%s is not defined in"
+ " the configuration.", ConfigurationKeys.KAFKA_BROKERS));
List<String> kafkaBrokerUriList = state.getPropAsList(ConfigurationKeys.KAFKA_BROKERS);
Preconditions.checkArgument(kafkaBrokerUriList.size() == 1,
String.format("The %s only supports having exactly one kafka broker defined for %s. "
+ "This is partially because the watermark implementation (e.g. %s class) does not have a schema that supports writing watermarks that contains offsets "
+ "from multiple brokers in a single job", KafkaExtractor.class.getSimpleName(),
ConfigurationKeys.KAFKA_BROKERS, KafkaStreamingExtractor.KafkaWatermark.class.getName()));

String brokerUri = kafkaBrokerUriList.get(0);
Map<String, String> brokerToSimpleName = KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state);

Preconditions.checkArgument(brokerToSimpleName.get(brokerUri) != null,
String.format("Unable to find simple name for the kafka cluster broker uri in the config. Please check the map "
+ "value of %s. brokerUri=%s, configMapValue=%s", KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, brokerUri, brokerToSimpleName));

return brokerToSimpleName.get(brokerUri);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin;

import org.testng.Assert;
import org.testng.annotations.Test;

import com.google.common.collect.ImmutableMap;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;


public class KafkaCommonUtilTest {

@Test
public void testGetKafkaBrokerToSimpleNameMap() {
String brokerUri = "kafka.some-identifier.kafka.coloc-123.com:12345";
String simpleName = "some-identifier";

State state = new State();
Assert.assertThrows(IllegalArgumentException.class, () -> KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state));

state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, String.format("%s->%s", brokerUri, simpleName));
Assert.assertEquals(KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state),
ImmutableMap.of(brokerUri, simpleName));

state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY,
String.format("foobar.com:12345->foobar,%s->%s", brokerUri, simpleName));
Assert.assertEquals(KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state),
ImmutableMap.of(brokerUri, simpleName, "foobar.com:12345", "foobar"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.source.extractor.extract.kafka;

import org.testng.Assert;
import org.testng.annotations.Test;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;


public class KafkaExtractorTest {

@Test
public void testGetKafkaBrokerSimpleName() {
State state = new State();
Assert.assertThrows(IllegalArgumentException.class, () -> KafkaExtractor.getKafkaBrokerSimpleName(state));
state.setProp(ConfigurationKeys.KAFKA_BROKERS, "");
Assert.assertThrows(IllegalArgumentException.class, () -> KafkaExtractor.getKafkaBrokerSimpleName(state));

final String kafkaBrokerUri = "kafka.broker.uri.com:12345";
final String kafkaBrokerSimpleName = "simple.kafka.name";
state.setProp(ConfigurationKeys.KAFKA_BROKERS, kafkaBrokerUri);
Assert.assertThrows(IllegalArgumentException.class, () -> KafkaExtractor.getKafkaBrokerSimpleName(state));

state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, String.format("foobar->foobarId", kafkaBrokerUri, kafkaBrokerSimpleName));
Assert.assertThrows(IllegalArgumentException.class, () -> KafkaExtractor.getKafkaBrokerSimpleName(state));

state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, String.format("%s->%s,foobar->foobarId", kafkaBrokerUri, kafkaBrokerSimpleName));
Assert.assertEquals(KafkaExtractor.getKafkaBrokerSimpleName(state), kafkaBrokerSimpleName);
}

@Test
public void testSimpleMapKeyIsBackwardCompatible() {
Config cfg = ConfigFactory.empty()
.withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("kafkaBrokerUri"))
.withValue(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY,
ConfigValueFactory.fromAnyRef("kafkaBrokerUri->simpleName"));

Assert.assertEquals(cfg.getString(ConfigurationKeys.KAFKA_BROKERS), "kafkaBrokerUri");
Assert.assertEquals(cfg.getString(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY), "kafkaBrokerUri->simpleName");
}
}

0 comments on commit 69d7e0f

Please sign in to comment.