-
Notifications
You must be signed in to change notification settings - Fork 749
[GOBBLIN-1922]Add function in Kafka Source to recompute workUnits for filtered partitions #3798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
ef5a2d2
3f43990
4847bca
fea48cc
0a351e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,10 @@ | |
| package org.apache.gobblin.source.extractor.extract.kafka; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
|
|
@@ -29,6 +33,7 @@ | |
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.regex.Pattern; | ||
|
|
||
| import java.util.stream.Collectors; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
@@ -60,6 +65,7 @@ | |
| import org.apache.gobblin.metrics.MetricContext; | ||
| import org.apache.gobblin.metrics.event.lineage.LineageInfo; | ||
| import org.apache.gobblin.source.extractor.extract.EventBasedSource; | ||
| import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker; | ||
| import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker; | ||
| import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators; | ||
| import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys; | ||
|
|
@@ -190,10 +196,27 @@ private void setLimiterReportKeyListToWorkUnits(List<WorkUnit> workUnits, List<S | |
|
|
||
| @Override | ||
| public List<WorkUnit> getWorkunits(SourceState state) { | ||
| return this.getWorkunitsForFilteredPartitions(state, Optional.absent(), Optional.absent()); | ||
| } | ||
|
|
||
| /** | ||
| * Compute Workunits for Kafka Topics. If filteredTopicPartition present, respect this map and only compute the provided | ||
| * topics and filtered partitions. If not, use state to discover Kafka topics and all available partitions. | ||
| * | ||
| * @param filteredTopicPartition optional parameter to determine if only filtered topic-partitions are needed. | ||
| * @param minContainer give an option to specify a minimum container count. Please be advised that how it being used is | ||
| * determined by the implementation of concrete {@link KafkaWorkUnitPacker} class. | ||
| * | ||
| * TODO: Utilize the minContainer in {@link KafkaTopicGroupingWorkUnitPacker#pack(Map, int)}, as the numContainers variable | ||
| * is not used currently. | ||
| */ | ||
| public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state, | ||
| Optional<Map<String, List<Integer>>> filteredTopicPartition, Optional<Integer> minContainer) { | ||
| this.metricContext = Instrumented.getMetricContext(state, KafkaSource.class); | ||
| this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker()); | ||
|
|
||
| Map<String, List<WorkUnit>> workUnits = Maps.newConcurrentMap(); | ||
| Map<String, List<Integer>> filteredTopicPartitionMap = filteredTopicPartition.or(new HashMap<>()); | ||
| Map<String, List<WorkUnit>> kafkaTopicWorkunitMap = Maps.newConcurrentMap(); | ||
| if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) { | ||
| String tableTypeStr = | ||
| state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, KafkaSource.DEFAULT_TABLE_TYPE.toString()); | ||
|
|
@@ -213,18 +236,22 @@ public List<WorkUnit> getWorkunits(SourceState state) { | |
| try { | ||
| Config config = ConfigUtils.propertiesToConfig(state.getProperties()); | ||
| GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = kafkaConsumerClientResolver | ||
| .resolveClass( | ||
| state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS, | ||
| DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance(); | ||
| .resolveClass( | ||
| state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS, | ||
| DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance(); | ||
|
|
||
| this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config)); | ||
|
|
||
| List<KafkaTopic> topics = getValidTopics(getFilteredTopics(state), state); | ||
| Collection<KafkaTopic> topics; | ||
| if(filteredTopicPartition.isPresent()) { | ||
| // If filteredTopicPartition present, use it to construct the whitelist pattern while leave blacklist empty | ||
| topics = this.kafkaConsumerClient.get().getFilteredTopics(Collections.emptyList(), | ||
| filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList())); | ||
| } else { | ||
| topics = getValidTopics(getFilteredTopics(state), state); | ||
| } | ||
| this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet()); | ||
|
|
||
| for (String topic : this.topicsToProcess) { | ||
| LOG.info("Discovered topic " + topic); | ||
| } | ||
| Map<String, State> topicSpecificStateMap = | ||
| DatasetUtils.getDatasetSpecificProps(Iterables.transform(topics, new Function<KafkaTopic, String>() { | ||
|
|
||
|
|
@@ -234,20 +261,13 @@ public String apply(KafkaTopic topic) { | |
| } | ||
| }), state); | ||
|
|
||
| for (KafkaTopic topic : topics) { | ||
| if (topic.getTopicSpecificState().isPresent()) { | ||
| topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State()) | ||
| .addAllIfNotExist(topic.getTopicSpecificState().get()); | ||
| } | ||
| } | ||
|
|
||
| int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS, | ||
| ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT); | ||
| ExecutorService threadPool = | ||
| Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG))); | ||
|
|
||
| if (state.getPropAsBoolean(ConfigurationKeys.KAFKA_SOURCE_SHARE_CONSUMER_CLIENT, | ||
| ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) { | ||
| ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) { | ||
| this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get(); | ||
| } else { | ||
| // preallocate one client per thread | ||
|
|
@@ -257,32 +277,42 @@ public String apply(KafkaTopic topic) { | |
| Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted(); | ||
|
|
||
| for (KafkaTopic topic : topics) { | ||
| LOG.info("Discovered topic " + topic); | ||
| if (topic.getTopicSpecificState().isPresent()) { | ||
| topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State()) | ||
| .addAllIfNotExist(topic.getTopicSpecificState().get()); | ||
| } | ||
| Optional<Set<Integer>> partitionIDSet = Optional.absent(); | ||
| if(filteredTopicPartition.isPresent()) { | ||
| List<Integer> list = java.util.Optional.ofNullable(filteredTopicPartitionMap.get(topic.getName())) | ||
| .orElse(new ArrayList<>()); | ||
| partitionIDSet = Optional.of(new HashSet<>(list)); | ||
| LOG.info("Compute the workunit for topic {} with num of filtered partitions: {}", | ||
| topic.getName(), list.size()); | ||
| } | ||
|
|
||
| threadPool.submit( | ||
| new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())), | ||
| workUnits)); | ||
| kafkaTopicWorkunitMap, partitionIDSet)); | ||
| } | ||
|
|
||
| ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, TimeUnit.HOURS); | ||
| LOG.info(String.format("Created workunits for %d topics in %d seconds", workUnits.size(), | ||
| LOG.info(String.format("Created workunits for %d topics in %d seconds", kafkaTopicWorkunitMap.size(), | ||
| createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS))); | ||
|
|
||
| // Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets, | ||
| // but aren't processed). | ||
| createEmptyWorkUnitsForSkippedPartitions(workUnits, topicSpecificStateMap, state); | ||
| //determine the number of mappers | ||
| int maxMapperNum = | ||
| state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS); | ||
| createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap, topicSpecificStateMap, state); | ||
|
|
||
| KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext)); | ||
| int numOfMultiWorkunits = maxMapperNum; | ||
| if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) { | ||
| double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits); | ||
| LOG.info(String.format("The total estimated data size is %.2f", totalEstDataSize)); | ||
| double targetMapperSize = state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE); | ||
| numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1; | ||
| numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum); | ||
| int numOfMultiWorkunits = minContainer.or(0); | ||
| if(state.contains(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) { | ||
| numOfMultiWorkunits = Math.max(numOfMultiWorkunits, | ||
| calculateNumMappersForPacker(state, kafkaWorkUnitPacker, kafkaTopicWorkunitMap)); | ||
| } | ||
| addTopicSpecificPropsToWorkUnits(workUnits, topicSpecificStateMap); | ||
| List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(workUnits, numOfMultiWorkunits); | ||
|
|
||
| addTopicSpecificPropsToWorkUnits(kafkaTopicWorkunitMap, topicSpecificStateMap); | ||
| List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(kafkaTopicWorkunitMap, numOfMultiWorkunits); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this mean if we provide filtered topic partitions, we need to make sure there are only 1 topic? Otherwise how will the minimum container take effect
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the design is to able to handle multiple topics.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure whether you can achieve the goal with this method as for the scenario where we pack all partition in same container, normally they are pretty light in weight.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. take this example of before calling squeezeMultiWorkUnits for mwuGroups, it contains multiWU as: Say we set numContainers as 5, so we can further split the multiWU to:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but you're right in some cases the WU can be very light weighted, so maybe I can combine value of weight and partition count as the queue comparator.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would rather suggest to keep the logic there simple but make sure when you call the method, you just call it for one single topic(or even one single failed task) at one time "split" the original work unit.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but anyway, you can do this in a separate PR |
||
| setLimiterReportKeyListToWorkUnits(workUnitList, getLimiterExtractorReportKeys()); | ||
| return workUnitList; | ||
| } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { | ||
|
|
@@ -305,6 +335,7 @@ public String apply(KafkaTopic topic) { | |
| LOG.error("Exception {} encountered closing GobblinKafkaConsumerClient ", t); | ||
| } | ||
| } | ||
|
|
||
| } | ||
|
|
||
| protected void populateClientPool(int count, | ||
|
|
@@ -377,17 +408,37 @@ private void createEmptyWorkUnitsForSkippedPartitions(Map<String, List<WorkUnit> | |
| } | ||
| } | ||
|
|
||
| //determine the number of mappers/containers for workunit packer | ||
| private int calculateNumMappersForPacker(SourceState state, | ||
| KafkaWorkUnitPacker kafkaWorkUnitPacker, Map<String, List<WorkUnit>> workUnits) { | ||
| int maxMapperNum = | ||
| state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS); | ||
| int numContainers = maxMapperNum; | ||
| if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) { | ||
| double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits); | ||
| LOG.info(String.format("The total estimated data size is %.2f", totalEstDataSize)); | ||
| double targetMapperSize = state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE); | ||
| numContainers = (int) (totalEstDataSize / targetMapperSize) + 1; | ||
| numContainers = Math.min(numContainers, maxMapperNum); | ||
| } | ||
| return numContainers; | ||
| } | ||
|
|
||
| /* | ||
| * This function need to be thread safe since it is called in the Runnable | ||
| */ | ||
| private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState) { | ||
| private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState state, | ||
| Optional<State> topicSpecificState, Optional<Set<Integer>> filteredPartitions) { | ||
| Timer.Context context = this.metricContext.timer("isTopicQualifiedTimer").time(); | ||
| boolean topicQualified = isTopicQualified(topic); | ||
| context.close(); | ||
|
|
||
| List<WorkUnit> workUnits = Lists.newArrayList(); | ||
| List<KafkaPartition> topicPartitions = topic.getPartitions(); | ||
| for (KafkaPartition partition : topicPartitions) { | ||
| if(filteredPartitions.isPresent() && !filteredPartitions.get().contains(partition.getId())) { | ||
| continue; | ||
| } | ||
| WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state, topicSpecificState); | ||
| if (workUnit != null) { | ||
| // For disqualified topics, for each of its workunits set the high watermark to be the same | ||
|
|
@@ -895,13 +946,20 @@ private class WorkUnitCreator implements Runnable { | |
| private final SourceState state; | ||
| private final Optional<State> topicSpecificState; | ||
| private final Map<String, List<WorkUnit>> allTopicWorkUnits; | ||
| private final Optional<Set<Integer>> filteredPartitionsId; | ||
|
|
||
| WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState, | ||
| Map<String, List<WorkUnit>> workUnits) { | ||
| this(topic, state, topicSpecificState, workUnits, Optional.absent()); | ||
| } | ||
|
|
||
| WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState, | ||
|
hanghangliu marked this conversation as resolved.
|
||
| Map<String, List<WorkUnit>> workUnits, Optional<Set<Integer>> filteredPartitionsId) { | ||
| this.topic = topic; | ||
| this.state = state; | ||
| this.topicSpecificState = topicSpecificState; | ||
| this.allTopicWorkUnits = workUnits; | ||
| this.filteredPartitionsId = filteredPartitionsId; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -917,7 +975,7 @@ public void run() { | |
| } | ||
|
|
||
| this.allTopicWorkUnits.put(this.topic.getName(), | ||
| KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, this.topicSpecificState)); | ||
| KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, this.topicSpecificState, this.filteredPartitionsId)); | ||
| } catch (Throwable t) { | ||
| LOG.error("Caught error in creating work unit for " + this.topic.getName(), t); | ||
| throw new RuntimeException(t); | ||
|
|
@@ -930,4 +988,4 @@ public void run() { | |
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel this might cause problem as we intentionally skip some topic partitions, seems this method will add them back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a condition to check if filteredTopicPartition present. Only invoke the call the it's not present