Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ protected String canonicalMetricName(String metricGroup, Collection<String> metr
return processedName;
}


/**
* Get a list of all kafka topics
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
package org.apache.gobblin.source.extractor.extract.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -29,6 +33,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -60,6 +65,7 @@
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.extract.EventBasedSource;
import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker;
import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators;
import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys;
Expand Down Expand Up @@ -190,10 +196,27 @@ private void setLimiterReportKeyListToWorkUnits(List<WorkUnit> workUnits, List<S

@Override
public List<WorkUnit> getWorkunits(SourceState state) {
return this.getWorkunitsForFilteredPartitions(state, Optional.absent(), Optional.absent());
}

/**
* Compute Workunits for Kafka Topics. If filteredTopicPartition present, respect this map and only compute the provided
* topics and filtered partitions. If not, use state to discover Kafka topics and all available partitions.
*
* @param filteredTopicPartition optional parameter to determine if only filtered topic-partitions are needed.
* @param minContainer give an option to specify a minimum container count. Please be advised that how it being used is
* determined by the implementation of concrete {@link KafkaWorkUnitPacker} class.
*
* TODO: Utilize the minContainer in {@link KafkaTopicGroupingWorkUnitPacker#pack(Map, int)}, as the numContainers variable
* is not used currently.
*/
public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state,
Optional<Map<String, List<Integer>>> filteredTopicPartition, Optional<Integer> minContainer) {
this.metricContext = Instrumented.getMetricContext(state, KafkaSource.class);
this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());

Map<String, List<WorkUnit>> workUnits = Maps.newConcurrentMap();
Map<String, List<Integer>> filteredTopicPartitionMap = filteredTopicPartition.or(new HashMap<>());
Map<String, List<WorkUnit>> kafkaTopicWorkunitMap = Maps.newConcurrentMap();
if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) {
String tableTypeStr =
state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, KafkaSource.DEFAULT_TABLE_TYPE.toString());
Expand All @@ -213,18 +236,22 @@ public List<WorkUnit> getWorkunits(SourceState state) {
try {
Config config = ConfigUtils.propertiesToConfig(state.getProperties());
GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = kafkaConsumerClientResolver
.resolveClass(
state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();
.resolveClass(
state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();

this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));

List<KafkaTopic> topics = getValidTopics(getFilteredTopics(state), state);
Collection<KafkaTopic> topics;
if(filteredTopicPartition.isPresent()) {
// If filteredTopicPartition present, use it to construct the whitelist pattern while leave blacklist empty
topics = this.kafkaConsumerClient.get().getFilteredTopics(Collections.emptyList(),
filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList()));
} else {
topics = getValidTopics(getFilteredTopics(state), state);
}
this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet());

for (String topic : this.topicsToProcess) {
LOG.info("Discovered topic " + topic);
}
Map<String, State> topicSpecificStateMap =
DatasetUtils.getDatasetSpecificProps(Iterables.transform(topics, new Function<KafkaTopic, String>() {

Expand All @@ -234,20 +261,13 @@ public String apply(KafkaTopic topic) {
}
}), state);

for (KafkaTopic topic : topics) {
if (topic.getTopicSpecificState().isPresent()) {
topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State())
.addAllIfNotExist(topic.getTopicSpecificState().get());
}
}

int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
ExecutorService threadPool =
Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG)));

if (state.getPropAsBoolean(ConfigurationKeys.KAFKA_SOURCE_SHARE_CONSUMER_CLIENT,
ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) {
ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) {
this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get();
} else {
// preallocate one client per thread
Expand All @@ -257,32 +277,44 @@ public String apply(KafkaTopic topic) {
Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();

for (KafkaTopic topic : topics) {
LOG.info("Discovered topic " + topic);
if (topic.getTopicSpecificState().isPresent()) {
topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State())
.addAllIfNotExist(topic.getTopicSpecificState().get());
}
Optional<Set<Integer>> partitionIDSet = Optional.absent();
if(filteredTopicPartition.isPresent()) {
List<Integer> list = java.util.Optional.ofNullable(filteredTopicPartitionMap.get(topic.getName()))
.orElse(new ArrayList<>());
partitionIDSet = Optional.of(new HashSet<>(list));
LOG.info("Compute the workunit for topic {} with num of filtered partitions: {}",
topic.getName(), list.size());
}

threadPool.submit(
new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
workUnits));
kafkaTopicWorkunitMap, partitionIDSet));
}

ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, TimeUnit.HOURS);
LOG.info(String.format("Created workunits for %d topics in %d seconds", workUnits.size(),
LOG.info(String.format("Created workunits for %d topics in %d seconds", kafkaTopicWorkunitMap.size(),
createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)));

// Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets,
// but aren't processed).
createEmptyWorkUnitsForSkippedPartitions(workUnits, topicSpecificStateMap, state);
//determine the number of mappers
int maxMapperNum =
state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
// but aren't processed). When filteredTopicPartition present, only filtered topic-partitions are needed so skip this call
if(!filteredTopicPartition.isPresent()) {
createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap, topicSpecificStateMap, state);
}

KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
int numOfMultiWorkunits = maxMapperNum;
if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
LOG.info(String.format("The total estimated data size is %.2f", totalEstDataSize));
double targetMapperSize = state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
int numOfMultiWorkunits = minContainer.or(1);
if(state.contains(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) {
numOfMultiWorkunits = Math.max(numOfMultiWorkunits,
calculateNumMappersForPacker(state, kafkaWorkUnitPacker, kafkaTopicWorkunitMap));
}
addTopicSpecificPropsToWorkUnits(workUnits, topicSpecificStateMap);
List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(workUnits, numOfMultiWorkunits);

addTopicSpecificPropsToWorkUnits(kafkaTopicWorkunitMap, topicSpecificStateMap);
List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(kafkaTopicWorkunitMap, numOfMultiWorkunits);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean if we provide filtered topic partitions, we need to make sure there are only 1 topic? Otherwise how will the minimum container take effect

Copy link
Copy Markdown
Contributor Author

@hanghangliu hanghangliu Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the design is to able to handle multiple topics.
My idea is in KafkaTopicGroupingWorkUnitPacker, before squeeze all MultiWU, we construct a priority queue based on WU weight, and keep splitting MultiWorkUnits, until the minContainer count reaches or can't split anymore(each MultiWU only contains 1 partition)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether you can achieve the goal with this method as for the scenario where we pack all partition in same container, normally they are pretty light in weight.

Copy link
Copy Markdown
Contributor Author

@hanghangliu hanghangliu Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take this example of before calling squeezeMultiWorkUnits for mwuGroups, it contains multiWU as:
topicA: partition 0-7
topicA: partition 8-15
topicB: partition 0-7

Say we set numContainers as 5, so we can further split the multiWU to:
topicA: partition 0-3
topicA: partition 4-7
topicA: partition 8-15
topicB: partition 0-3
topicB: partition 4-7
Then we squeeze them to 5 WU, will result in 5 containers. Do you think this process make sense?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but you're right in some cases the WU can be very light weighted, so maybe I can combine value of weight and partition count as the queue comparator.
Furthermore, if the top of the pq only contains 1 partition, we can keep iterate the queue until we find one that contains multiple partitions

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather suggest to keep the logic there simple but make sure when you call the method, you just call it for one single topic(or even one single failed task) at one time "split" the original work unit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but anyway, you can do this in a separate PR

setLimiterReportKeyListToWorkUnits(workUnitList, getLimiterExtractorReportKeys());
return workUnitList;
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
Expand All @@ -305,6 +337,7 @@ public String apply(KafkaTopic topic) {
LOG.error("Exception {} encountered closing GobblinKafkaConsumerClient ", t);
}
}

}

protected void populateClientPool(int count,
Expand Down Expand Up @@ -377,17 +410,37 @@ private void createEmptyWorkUnitsForSkippedPartitions(Map<String, List<WorkUnit>
}
}

//determine the number of mappers/containers for workunit packer
private int calculateNumMappersForPacker(SourceState state,
KafkaWorkUnitPacker kafkaWorkUnitPacker, Map<String, List<WorkUnit>> workUnits) {
int maxMapperNum =
state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
int numContainers = maxMapperNum;
if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
LOG.info(String.format("The total estimated data size is %.2f", totalEstDataSize));
double targetMapperSize = state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
numContainers = (int) (totalEstDataSize / targetMapperSize) + 1;
numContainers = Math.min(numContainers, maxMapperNum);
}
return numContainers;
}

/*
* This function need to be thread safe since it is called in the Runnable
*/
private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState) {
private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState state,
Optional<State> topicSpecificState, Optional<Set<Integer>> filteredPartitions) {
Timer.Context context = this.metricContext.timer("isTopicQualifiedTimer").time();
boolean topicQualified = isTopicQualified(topic);
context.close();

List<WorkUnit> workUnits = Lists.newArrayList();
List<KafkaPartition> topicPartitions = topic.getPartitions();
for (KafkaPartition partition : topicPartitions) {
if(filteredPartitions.isPresent() && !filteredPartitions.get().contains(partition.getId())) {
continue;
}
WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state, topicSpecificState);
if (workUnit != null) {
// For disqualified topics, for each of its workunits set the high watermark to be the same
Expand Down Expand Up @@ -895,13 +948,20 @@ private class WorkUnitCreator implements Runnable {
private final SourceState state;
private final Optional<State> topicSpecificState;
private final Map<String, List<WorkUnit>> allTopicWorkUnits;
private final Optional<Set<Integer>> filteredPartitionsId;

WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState,
Map<String, List<WorkUnit>> workUnits) {
this(topic, state, topicSpecificState, workUnits, Optional.absent());
}

WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState,
Map<String, List<WorkUnit>> workUnits, Optional<Set<Integer>> filteredPartitionsId) {
this.topic = topic;
this.state = state;
this.topicSpecificState = topicSpecificState;
this.allTopicWorkUnits = workUnits;
this.filteredPartitionsId = filteredPartitionsId;
}

@Override
Expand All @@ -917,7 +977,7 @@ public void run() {
}

this.allTopicWorkUnits.put(this.topic.getName(),
KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, this.topicSpecificState));
KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, this.topicSpecificState, this.filteredPartitionsId));
} catch (Throwable t) {
LOG.error("Caught error in creating work unit for " + this.topic.getName(), t);
throw new RuntimeException(t);
Expand All @@ -930,4 +990,4 @@ public void run() {
}
}
}
}
}
Loading