From ef5a2d281d29e31ff8a1700804e48b39c55d2c48 Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Wed, 11 Oct 2023 11:42:13 -0700 Subject: [PATCH 1/5] add function in Kafka Source to recompute workUnits for filtered partitions --- .../AbstractBaseKafkaConsumerClient.java | 14 -- .../client/GobblinKafkaConsumerClient.java | 17 ++ .../extractor/extract/kafka/KafkaSource.java | 146 ++++++++++++++++-- 3 files changed, 151 insertions(+), 26 deletions(-) diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java index 3933adb3c51..162f0a3ff02 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java @@ -157,18 +157,4 @@ protected String canonicalMetricName(String metricGroup, Collection metr return processedName; } - - /** - * Get a list of all kafka topics - */ - public abstract List getTopics(); - - /** - * Get a list of {@link KafkaTopic} with the provided topic names. - * The default implementation lists all the topics. - * Implementations of this class can improve this method. - */ - public Collection getTopics(Collection topics) { - return getTopics(); - } } diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java index c75408e8f83..08a0a8ef976 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java @@ -28,6 +28,7 @@ import com.google.common.collect.Maps; import com.typesafe.config.Config; +import java.util.stream.Collectors; import org.apache.gobblin.source.extractor.extract.LongWatermark; import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException; import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition; @@ -169,6 +170,22 @@ default long committed(KafkaPartition partition) { public default void assignAndSeek(List topicPartitions, Map topicWatermarksMap) { return; } + /** + * Get a list of all kafka topics + */ + public default List getTopics() {return Collections.emptyList();}; + + /** + * Get a list of {@link KafkaTopic} with the provided topic names. + * The default implementation firstly retrieve all the topics, then filter by the provided list. + * Implementations of this class can improve this method. + */ + public default Collection getTopics(Collection topics) { + return getTopics().stream() + .filter(list-> topics.contains(list)) + .collect(Collectors.toList()); + } + /** * A factory to create {@link GobblinKafkaConsumerClient}s */ diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index 0bca916a704..c6dd212161a 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -18,6 +18,7 @@ package org.apache.gobblin.source.extractor.extract.kafka; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -28,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +62,7 @@ import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.source.extractor.extract.EventBasedSource; +import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker; import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker; import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators; import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys; @@ -269,17 +272,11 @@ public String apply(KafkaTopic topic) { // Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets, // but aren't processed). createEmptyWorkUnitsForSkippedPartitions(workUnits, topicSpecificStateMap, state); - //determine the number of mappers - int maxMapperNum = - state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS); + KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext)); - int numOfMultiWorkunits = maxMapperNum; - if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) { - double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits); - LOG.info(String.format("The total estimated data size is %.2f", totalEstDataSize)); - double targetMapperSize = state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE); - numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1; - numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum); + int numOfMultiWorkunits= 0; + if(!(kafkaWorkUnitPacker instanceof KafkaTopicGroupingWorkUnitPacker)) { + numOfMultiWorkunits = calculateNumMappersForPacker(state, kafkaWorkUnitPacker, workUnits); } addTopicSpecificPropsToWorkUnits(workUnits, topicSpecificStateMap); List workUnitList = kafkaWorkUnitPacker.pack(workUnits, numOfMultiWorkunits); @@ -307,6 +304,96 @@ public String apply(KafkaTopic topic) { } } + public List getWorkunitsForFilteredPartitions(SourceState state, Map> filteredTopicPartitionMap, int minContainer) { + Map> kafkaTopicWorkunitMap = Maps.newConcurrentMap(); + Collection topics; + try { + Config config = ConfigUtils.propertiesToConfig(state.getProperties()); + GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = kafkaConsumerClientResolver + .resolveClass( + state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS, + DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance(); + + this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config)); + topics = this.kafkaConsumerClient.get().getTopics(filteredTopicPartitionMap.keySet()); + + Map topicSpecificStateMap = + DatasetUtils.getDatasetSpecificProps(Iterables.transform(topics, new Function() { + + @Override + public String apply(KafkaTopic topic) { + return topic.getName(); + } + }), state); + + int numOfThreads = Math.min(topics.size(), state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS, + ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT)); + ExecutorService threadPool = + Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG))); + + if (state.getPropAsBoolean(ConfigurationKeys.KAFKA_SOURCE_SHARE_CONSUMER_CLIENT, + ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) { + this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get(); + } else { + // preallocate one client per thread + populateClientPool(3, kafkaConsumerClientFactory, config); + } + + Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted(); + for (KafkaTopic topic : topics) { + LOG.info("Trying to re-compute the workunit for topic {} with filtered partitions: {}", + topic.getName(), filteredTopicPartitionMap.get(topic.getName())); + if (topic.getTopicSpecificState().isPresent()) { + topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State()) + .addAllIfNotExist(topic.getTopicSpecificState().get()); + } + Set partitionIDSet = filteredTopicPartitionMap.get(topic.getName()) + .stream().map(Integer::parseInt).collect(Collectors.toSet()); + threadPool.submit( + new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())), + kafkaTopicWorkunitMap, partitionIDSet)); + } + ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, TimeUnit.HOURS); + LOG.info("Created workunits for {} topics in {} seconds", kafkaTopicWorkunitMap.size(), + createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)); + + // Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets, + // but aren't processed). + createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap, topicSpecificStateMap, state); + KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext)); + int numOfMultiWorkunits = minContainer; + if(state.contains(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) { + numOfMultiWorkunits = Math.max(numOfMultiWorkunits, + calculateNumMappersForPacker(state, kafkaWorkUnitPacker, kafkaTopicWorkunitMap)); + } + addTopicSpecificPropsToWorkUnits(kafkaTopicWorkunitMap, topicSpecificStateMap); + // TODO: numOfMultiWorkunits is currently not used in the #KafkaTopicGroupingWorkUnitPacker.pack() method. + // Next step is utilize this number to determine the container count + List workUnitList = kafkaWorkUnitPacker.pack(kafkaTopicWorkunitMap, numOfMultiWorkunits); + setLimiterReportKeyListToWorkUnits(workUnitList, getLimiterExtractorReportKeys()); + return workUnitList; + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new RuntimeException("Checked exception caught", e); + } catch (Throwable t) { + throw new RuntimeException("Unexpected throwable caught, ", t); + } finally { + try { + GobblinKafkaConsumerClient consumerClient = this.kafkaConsumerClient.get(); + if (consumerClient != null) { + consumerClient.close(); + } + // cleanup clients from pool + for (GobblinKafkaConsumerClient client: kafkaConsumerClientPool) { + client.close(); + } + } catch (Throwable t) { + //Swallow any exceptions in the finally{..} block to allow potential exceptions from the main try{..} block to be + //propagated + LOG.error("Exception {} encountered closing GobblinKafkaConsumerClient ", t); + } + } + } + protected void populateClientPool(int count, GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory, Config config) { @@ -377,10 +464,31 @@ private void createEmptyWorkUnitsForSkippedPartitions(Map } } + //determine the number of mappers/containers for workunit packer + private int calculateNumMappersForPacker(SourceState state, + KafkaWorkUnitPacker kafkaWorkUnitPacker, Map> workUnits) { + int maxMapperNum = + state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS); + int numContainers = maxMapperNum; + if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) { + double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits); + LOG.info(String.format("The total estimated data size is %.2f", totalEstDataSize)); + double targetMapperSize = state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE); + numContainers = (int) (totalEstDataSize / targetMapperSize) + 1; + numContainers = Math.min(numContainers, maxMapperNum); + } + return numContainers; + } + /* * This function need to be thread safe since it is called in the Runnable */ private List getWorkUnitsForTopic(KafkaTopic topic, SourceState state, Optional topicSpecificState) { + return this.getWorkUnitsForTopic(topic, state, topicSpecificState, Optional.absent()); + } + + private List getWorkUnitsForTopic(KafkaTopic topic, SourceState state, + Optional topicSpecificState, Optional> filteredPartitions) { Timer.Context context = this.metricContext.timer("isTopicQualifiedTimer").time(); boolean topicQualified = isTopicQualified(topic); context.close(); @@ -388,6 +496,9 @@ private List getWorkUnitsForTopic(KafkaTopic topic, SourceState state, List workUnits = Lists.newArrayList(); List topicPartitions = topic.getPartitions(); for (KafkaPartition partition : topicPartitions) { + if(filteredPartitions.isPresent() && !filteredPartitions.get().contains(partition.getId())) { + continue; + } WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state, topicSpecificState); if (workUnit != null) { // For disqualified topics, for each of its workunits set the high watermark to be the same @@ -895,6 +1006,7 @@ private class WorkUnitCreator implements Runnable { private final SourceState state; private final Optional topicSpecificState; private final Map> allTopicWorkUnits; + private final Optional> filteredPartitionsId; WorkUnitCreator(KafkaTopic topic, SourceState state, Optional topicSpecificState, Map> workUnits) { @@ -902,6 +1014,16 @@ private class WorkUnitCreator implements Runnable { this.state = state; this.topicSpecificState = topicSpecificState; this.allTopicWorkUnits = workUnits; + this.filteredPartitionsId = Optional.absent(); + } + + WorkUnitCreator(KafkaTopic topic, SourceState state, Optional topicSpecificState, + Map> workUnits, Set filteredPartitionsId) { + this.topic = topic; + this.state = state; + this.topicSpecificState = topicSpecificState; + this.allTopicWorkUnits = workUnits; + this.filteredPartitionsId = Optional.of(filteredPartitionsId); } @Override @@ -917,7 +1039,7 @@ public void run() { } this.allTopicWorkUnits.put(this.topic.getName(), - KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, this.topicSpecificState)); + KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, this.topicSpecificState, this.filteredPartitionsId)); } catch (Throwable t) { LOG.error("Caught error in creating work unit for " + this.topic.getName(), t); throw new RuntimeException(t); @@ -930,4 +1052,4 @@ public void run() { } } } -} +} \ No newline at end of file From 3f4399004499767502639a705a2abde60676fc88 Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Thu, 19 Oct 2023 11:40:46 -0700 Subject: [PATCH 2/5] address comments --- .../AbstractBaseKafkaConsumerClient.java | 13 ++ .../client/GobblinKafkaConsumerClient.java | 17 -- .../extractor/extract/kafka/KafkaSource.java | 172 ++++++------------ .../extract/kafka/KafkaSourceTest.java | 114 +++++++++++- 4 files changed, 176 insertions(+), 140 deletions(-) diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java index 162f0a3ff02..18cc75c3095 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java @@ -157,4 +157,17 @@ protected String canonicalMetricName(String metricGroup, Collection metr return processedName; } + /** + * Get a list of all kafka topics + */ + public abstract List getTopics(); + + /** + * Get a list of {@link KafkaTopic} with the provided topic names. + * The default implementation lists all the topics. + * Implementations of this class can improve this method. + */ + public Collection getTopics(Collection topics) { + return getTopics(); + } } diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java index 08a0a8ef976..c75408e8f83 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java @@ -28,7 +28,6 @@ import com.google.common.collect.Maps; import com.typesafe.config.Config; -import java.util.stream.Collectors; import org.apache.gobblin.source.extractor.extract.LongWatermark; import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException; import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition; @@ -170,22 +169,6 @@ default long committed(KafkaPartition partition) { public default void assignAndSeek(List topicPartitions, Map topicWatermarksMap) { return; } - /** - * Get a list of all kafka topics - */ - public default List getTopics() {return Collections.emptyList();}; - - /** - * Get a list of {@link KafkaTopic} with the provided topic names. - * The default implementation firstly retrieve all the topics, then filter by the provided list. - * Implementations of this class can improve this method. - */ - public default Collection getTopics(Collection topics) { - return getTopics().stream() - .filter(list-> topics.contains(list)) - .collect(Collectors.toList()); - } - /** * A factory to create {@link GobblinKafkaConsumerClient}s */ diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index c6dd212161a..f0df0817196 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -19,6 +19,9 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -29,8 +32,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; -import java.util.stream.Collectors; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -193,10 +196,27 @@ private void setLimiterReportKeyListToWorkUnits(List workUnits, List getWorkunits(SourceState state) { + return this.getWorkunitsForFilteredPartitions(state, Optional.absent(), Optional.absent()); + } + + /** + * Compute Workunits for Kafka Topics. If filteredTopicPartition present, respect this map and only compute the provided + * topics and filtered partitions. If not, use state to discover Kafka topics and all available partitions. + * + * @param filteredTopicPartition optional parameter to determine if only filtered topic-partitions are needed. + * @param minContainer give an option to specify a minimum container count. Please be advised that how it being used is + * determined by the implementation of concrete {@link KafkaWorkUnitPacker} class. + * + * TODO: Utilize the minContainer in {@link KafkaTopicGroupingWorkUnitPacker#pack(Map, int)}, as the numContainers variable + * is not used currently. + */ + public List getWorkunitsForFilteredPartitions(SourceState state, + Optional>> filteredTopicPartition, Optional minContainer) { this.metricContext = Instrumented.getMetricContext(state, KafkaSource.class); this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker()); - Map> workUnits = Maps.newConcurrentMap(); + Map> filteredTopicPartitionMap = filteredTopicPartition.or(new HashMap<>()); + Map> kafkaTopicWorkunitMap = Maps.newConcurrentMap(); if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) { String tableTypeStr = state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, KafkaSource.DEFAULT_TABLE_TYPE.toString()); @@ -216,18 +236,22 @@ public List getWorkunits(SourceState state) { try { Config config = ConfigUtils.propertiesToConfig(state.getProperties()); GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = kafkaConsumerClientResolver - .resolveClass( - state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS, - DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance(); + .resolveClass( + state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS, + DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance(); this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config)); - List topics = getValidTopics(getFilteredTopics(state), state); + Collection topics; + if(filteredTopicPartition.isPresent()) { + // If filteredTopicPartition present, use it to construct the whitelist pattern while leave blacklist empty + topics = this.kafkaConsumerClient.get().getFilteredTopics(Collections.emptyList(), + filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList())); + } else { + topics = getValidTopics(getFilteredTopics(state), state); + } this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet()); - for (String topic : this.topicsToProcess) { - LOG.info("Discovered topic " + topic); - } Map topicSpecificStateMap = DatasetUtils.getDatasetSpecificProps(Iterables.transform(topics, new Function() { @@ -237,20 +261,13 @@ public String apply(KafkaTopic topic) { } }), state); - for (KafkaTopic topic : topics) { - if (topic.getTopicSpecificState().isPresent()) { - topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State()) - .addAllIfNotExist(topic.getTopicSpecificState().get()); - } - } - int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS, ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT); ExecutorService threadPool = Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG))); if (state.getPropAsBoolean(ConfigurationKeys.KAFKA_SOURCE_SHARE_CONSUMER_CLIENT, - ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) { + ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) { this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get(); } else { // preallocate one client per thread @@ -260,115 +277,41 @@ public String apply(KafkaTopic topic) { Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted(); for (KafkaTopic topic : topics) { - threadPool.submit( - new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())), - workUnits)); - } - - ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, TimeUnit.HOURS); - LOG.info(String.format("Created workunits for %d topics in %d seconds", workUnits.size(), - createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS))); - - // Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets, - // but aren't processed). - createEmptyWorkUnitsForSkippedPartitions(workUnits, topicSpecificStateMap, state); - - KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext)); - int numOfMultiWorkunits= 0; - if(!(kafkaWorkUnitPacker instanceof KafkaTopicGroupingWorkUnitPacker)) { - numOfMultiWorkunits = calculateNumMappersForPacker(state, kafkaWorkUnitPacker, workUnits); - } - addTopicSpecificPropsToWorkUnits(workUnits, topicSpecificStateMap); - List workUnitList = kafkaWorkUnitPacker.pack(workUnits, numOfMultiWorkunits); - setLimiterReportKeyListToWorkUnits(workUnitList, getLimiterExtractorReportKeys()); - return workUnitList; - } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { - throw new RuntimeException("Checked exception caught", e); - } catch (Throwable t) { - throw new RuntimeException("Unexpected throwable caught, ", t); - } finally { - try { - GobblinKafkaConsumerClient consumerClient = this.kafkaConsumerClient.get(); - if (consumerClient != null) { - consumerClient.close(); - } - // cleanup clients from pool - for (GobblinKafkaConsumerClient client: kafkaConsumerClientPool) { - client.close(); - } - } catch (Throwable t) { - //Swallow any exceptions in the finally{..} block to allow potential exceptions from the main try{..} block to be - //propagated - LOG.error("Exception {} encountered closing GobblinKafkaConsumerClient ", t); - } - } - } - - public List getWorkunitsForFilteredPartitions(SourceState state, Map> filteredTopicPartitionMap, int minContainer) { - Map> kafkaTopicWorkunitMap = Maps.newConcurrentMap(); - Collection topics; - try { - Config config = ConfigUtils.propertiesToConfig(state.getProperties()); - GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = kafkaConsumerClientResolver - .resolveClass( - state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS, - DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance(); - - this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config)); - topics = this.kafkaConsumerClient.get().getTopics(filteredTopicPartitionMap.keySet()); - - Map topicSpecificStateMap = - DatasetUtils.getDatasetSpecificProps(Iterables.transform(topics, new Function() { - - @Override - public String apply(KafkaTopic topic) { - return topic.getName(); - } - }), state); - - int numOfThreads = Math.min(topics.size(), state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS, - ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT)); - ExecutorService threadPool = - Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG))); - - if (state.getPropAsBoolean(ConfigurationKeys.KAFKA_SOURCE_SHARE_CONSUMER_CLIENT, - ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) { - this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get(); - } else { - // preallocate one client per thread - populateClientPool(3, kafkaConsumerClientFactory, config); - } - - Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted(); - for (KafkaTopic topic : topics) { - LOG.info("Trying to re-compute the workunit for topic {} with filtered partitions: {}", - topic.getName(), filteredTopicPartitionMap.get(topic.getName())); + LOG.info("Discovered topic " + topic); if (topic.getTopicSpecificState().isPresent()) { topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State()) .addAllIfNotExist(topic.getTopicSpecificState().get()); } - Set partitionIDSet = filteredTopicPartitionMap.get(topic.getName()) - .stream().map(Integer::parseInt).collect(Collectors.toSet()); + Optional> partitionIDSet = Optional.absent(); + if(filteredTopicPartition.isPresent()) { + List list = java.util.Optional.ofNullable(filteredTopicPartitionMap.get(topic.getName())) + .orElse(new ArrayList<>()); + partitionIDSet = Optional.of(new HashSet<>(list)); + LOG.info("Compute the workunit for topic {} with num of filtered partitions: {}", + topic.getName(), list.size()); + } + threadPool.submit( new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())), kafkaTopicWorkunitMap, partitionIDSet)); } + ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, TimeUnit.HOURS); - LOG.info("Created workunits for {} topics in {} seconds", kafkaTopicWorkunitMap.size(), - createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)); + LOG.info(String.format("Created workunits for %d topics in %d seconds", kafkaTopicWorkunitMap.size(), + createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS))); // Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets, // but aren't processed). createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap, topicSpecificStateMap, state); + KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext)); - int numOfMultiWorkunits = minContainer; + int numOfMultiWorkunits = minContainer.or(0); if(state.contains(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) { numOfMultiWorkunits = Math.max(numOfMultiWorkunits, calculateNumMappersForPacker(state, kafkaWorkUnitPacker, kafkaTopicWorkunitMap)); } + addTopicSpecificPropsToWorkUnits(kafkaTopicWorkunitMap, topicSpecificStateMap); - // TODO: numOfMultiWorkunits is currently not used in the #KafkaTopicGroupingWorkUnitPacker.pack() method. - // Next step is utilize this number to determine the container count List workUnitList = kafkaWorkUnitPacker.pack(kafkaTopicWorkunitMap, numOfMultiWorkunits); setLimiterReportKeyListToWorkUnits(workUnitList, getLimiterExtractorReportKeys()); return workUnitList; @@ -392,6 +335,7 @@ public String apply(KafkaTopic topic) { LOG.error("Exception {} encountered closing GobblinKafkaConsumerClient ", t); } } + } protected void populateClientPool(int count, @@ -483,10 +427,6 @@ private int calculateNumMappersForPacker(SourceState state, /* * This function need to be thread safe since it is called in the Runnable */ - private List getWorkUnitsForTopic(KafkaTopic topic, SourceState state, Optional topicSpecificState) { - return this.getWorkUnitsForTopic(topic, state, topicSpecificState, Optional.absent()); - } - private List getWorkUnitsForTopic(KafkaTopic topic, SourceState state, Optional topicSpecificState, Optional> filteredPartitions) { Timer.Context context = this.metricContext.timer("isTopicQualifiedTimer").time(); @@ -1010,20 +950,16 @@ private class WorkUnitCreator implements Runnable { WorkUnitCreator(KafkaTopic topic, SourceState state, Optional topicSpecificState, Map> workUnits) { - this.topic = topic; - this.state = state; - this.topicSpecificState = topicSpecificState; - this.allTopicWorkUnits = workUnits; - this.filteredPartitionsId = Optional.absent(); + this(topic, state, topicSpecificState, workUnits, Optional.absent()); } WorkUnitCreator(KafkaTopic topic, SourceState state, Optional topicSpecificState, - Map> workUnits, Set filteredPartitionsId) { + Map> workUnits, Optional> filteredPartitionsId) { this.topic = topic; this.state = state; this.topicSpecificState = topicSpecificState; this.allTopicWorkUnits = workUnits; - this.filteredPartitionsId = Optional.of(filteredPartitionsId); + this.filteredPartitionsId = filteredPartitionsId; } @Override diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java index c26872e1ce3..fff8581fc0b 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java @@ -17,17 +17,27 @@ package org.apache.gobblin.source.extractor.extract.kafka; +import com.google.common.base.Optional; +import com.typesafe.config.Config; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; +import org.apache.gobblin.annotation.Alias; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicNameValidator; import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators; +import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker; +import org.apache.gobblin.source.workunit.MultiWorkUnit; +import org.apache.gobblin.source.workunit.WorkUnit; import org.testng.Assert; import org.testng.annotations.Test; @@ -38,8 +48,54 @@ import org.apache.gobblin.source.extractor.Extractor; import org.apache.gobblin.util.DatasetFilterUtils; +import static org.apache.gobblin.source.extractor.extract.kafka.KafkaSource.*; + public class KafkaSourceTest { + private static List testTopics =Arrays.asList( + "topic1", "topic2", "topic3"); + + @Test + public void testGetWorkunits() { + TestKafkaClient testKafkaClient = new TestKafkaClient(); + testKafkaClient.testTopics = testTopics; + SourceState state = new SourceState(); + state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, "TestPath"); + state.setProp(KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_TYPE, KafkaWorkUnitPacker.PackerType.CUSTOM); + state.setProp(KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE, "org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker"); + state.setProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS, "MockTestKafkaConsumerClientFactory"); + TestKafkaSource testKafkaSource = new TestKafkaSource(testKafkaClient); + List workUnits = testKafkaSource.getWorkunits(state); + + validatePartitionNumWithinWorkUnits(workUnits, 48); + + } + + @Test + public void testGetWorkunitsForFilteredPartitions() { + TestKafkaClient testKafkaClient = new TestKafkaClient(); + List allTopics = testTopics; + Map> filteredTopicPartitionMap = new HashMap<>(); + filteredTopicPartitionMap.put(allTopics.get(0), new LinkedList<>()); + filteredTopicPartitionMap.put(allTopics.get(1), new LinkedList<>()); + filteredTopicPartitionMap.put(allTopics.get(2), new LinkedList<>()); + filteredTopicPartitionMap.get(allTopics.get(0)).addAll(Arrays.asList(0, 11)); + filteredTopicPartitionMap.get(allTopics.get(1)).addAll(Arrays.asList(2, 8, 10)); + filteredTopicPartitionMap.get(allTopics.get(2)).addAll(Arrays.asList(1, 3, 5, 7)); + + testKafkaClient.testTopics = allTopics; + SourceState state = new SourceState(); + state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, "TestPath"); + state.setProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS, "MockTestKafkaConsumerClientFactory"); + TestKafkaSource testKafkaSource = new TestKafkaSource(testKafkaClient); + List workUnits = testKafkaSource.getWorkunitsForFilteredPartitions(state, Optional.of(filteredTopicPartitionMap), Optional.of(3)); + validatePartitionNumWithinWorkUnits(workUnits, 9); + + state.setProp(KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_TYPE, KafkaWorkUnitPacker.PackerType.CUSTOM); + state.setProp(KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE, "org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker"); + workUnits = testKafkaSource.getWorkunitsForFilteredPartitions(state, Optional.of(filteredTopicPartitionMap), Optional.of(1)); + validatePartitionNumWithinWorkUnits(workUnits, 9); + } @Test public void testGetFilteredTopics() { @@ -89,12 +145,60 @@ public void testTopicValidators() { toKafkaTopicList(allTopics.subList(0, 3)))); } - public List toKafkaTopicList(List topicNames) { - return topicNames.stream().map(topicName -> new KafkaTopic(topicName, Collections.emptyList())).collect(Collectors.toList()); + public static List creatPartitions(String topicName, int partitionNum) { + List partitions = new ArrayList<>(partitionNum); + for(int i = 0; i < partitionNum; i++ ) { + partitions.add(new KafkaPartition.Builder().withTopicName(topicName).withId(i).withLeaderHostAndPort("test").withLeaderId(1).build()); + } + return partitions; + } + + public static List getPartitionFromWorkUnit(WorkUnit workUnit) { + List topicPartitions = new ArrayList<>(); + if(workUnit instanceof MultiWorkUnit) { + for(WorkUnit wu : ((MultiWorkUnit) workUnit).getWorkUnits()) { + topicPartitions.addAll(getPartitionFromWorkUnit(wu)); + } + }else { + int i = 0; + String partitionIdProp = KafkaSource.PARTITION_ID + "." + i; + while (workUnit.getProp(partitionIdProp) != null) { + int partitionId = workUnit.getPropAsInt(partitionIdProp); + KafkaPartition topicPartition = + new KafkaPartition.Builder().withTopicName(workUnit.getProp(KafkaSource.TOPIC_NAME)).withId(partitionId).build(); + topicPartitions.add(topicPartition); + i++; + partitionIdProp = KafkaSource.PARTITION_ID + "." + i; + } + } + return topicPartitions; + } + + + public static List toKafkaTopicList(List topicNames) { + return topicNames.stream().map(topicName -> new KafkaTopic(topicName, creatPartitions(topicName, 16))).collect(Collectors.toList()); + } + + private void validatePartitionNumWithinWorkUnits(List workUnits, int expectPartitionNum) { + List partitionList = new ArrayList<>(); + for(WorkUnit workUnit : workUnits) { + partitionList.addAll(getPartitionFromWorkUnit(workUnit)); + } + Assert.assertEquals(partitionList.size(), expectPartitionNum); + } + + @Alias("MockTestKafkaConsumerClientFactory") + public static class MockTestKafkaConsumerClientFactory + implements GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory { + + @Override + public GobblinKafkaConsumerClient create(Config config) { + return new TestKafkaClient(); + } } - private class TestKafkaClient implements GobblinKafkaConsumerClient { - List testTopics; + public static class TestKafkaClient implements GobblinKafkaConsumerClient { + List testTopics = KafkaSourceTest.testTopics; @Override public List getFilteredTopics(List blacklist, List whitelist) { From 4847bca0e958f1d5ff9d520f759fd831c3278c36 Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Wed, 25 Oct 2023 13:30:02 -0700 Subject: [PATCH 3/5] set default min container value to 1 --- .../gobblin/source/extractor/extract/kafka/KafkaSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index f0df0817196..1513ac00906 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -305,7 +305,7 @@ public String apply(KafkaTopic topic) { createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap, topicSpecificStateMap, state); KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext)); - int numOfMultiWorkunits = minContainer.or(0); + int numOfMultiWorkunits = minContainer.or(1); if(state.contains(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) { numOfMultiWorkunits = Math.max(numOfMultiWorkunits, calculateNumMappersForPacker(state, kafkaWorkUnitPacker, kafkaTopicWorkunitMap)); From fea48ccc8e4b3f2c0291148f5c935c80ac3a8178 Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Wed, 25 Oct 2023 13:45:50 -0700 Subject: [PATCH 4/5] add condition when create empty wu --- .../gobblin/source/extractor/extract/kafka/KafkaSource.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index 1513ac00906..662df4454bc 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -301,8 +301,10 @@ public String apply(KafkaTopic topic) { createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS))); // Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets, - // but aren't processed). - createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap, topicSpecificStateMap, state); + // but aren't processed). When filteredTopicPartition present, only filtered topic-partitions are needed so skip this call + if(filteredTopicPartition.isPresent()) { + createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap, topicSpecificStateMap, state); + } KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext)); int numOfMultiWorkunits = minContainer.or(1); From 0a351e2b36dd6842c6b373f3088ed56b9436e100 Mon Sep 17 00:00:00 2001 From: hanghangliu Date: Wed, 25 Oct 2023 13:54:38 -0700 Subject: [PATCH 5/5] update the condition --- .../gobblin/source/extractor/extract/kafka/KafkaSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index 662df4454bc..80ef8c09d5e 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -302,7 +302,7 @@ public String apply(KafkaTopic topic) { // Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets, // but aren't processed). When filteredTopicPartition present, only filtered topic-partitions are needed so skip this call - if(filteredTopicPartition.isPresent()) { + if(!filteredTopicPartition.isPresent()) { createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap, topicSpecificStateMap, state); }