Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,15 @@ public int hashCode() {
public static MultiWorkUnit createEmpty() {
return new MultiWorkUnit();
}

/**
* Create a new {@link MultiWorkUnit} instance based on provided collection of {@link WorkUnit}s.
*
* @return a the {@link MultiWorkUnit} instance with the provided collection of {@link WorkUnit}s.
*/
public static MultiWorkUnit createMultiWorkUnit(Collection<WorkUnit> workUnits) {
MultiWorkUnit multiWorkUnit = new MultiWorkUnit();
multiWorkUnit.addWorkUnits(workUnits);
return multiWorkUnit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

import java.util.PriorityQueue;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.hadoop.fs.Path;

Expand Down Expand Up @@ -171,6 +173,9 @@ public KafkaTopicGroupingWorkUnitPacker(AbstractSource<?, ?> source, SourceState
* - For each topic pack the workunits into a set of topic specific buckets by filling the fullest bucket that can hold
* the workunit without exceeding the container capacity.
* - The topic specific multi-workunits are squeezed and returned as a workunit.
*
* @param numContainers desired number of containers, which will be the size of return value List<WorkUnit>. The actual
* num can be smaller or bigger depends on container capacity and total workUnit/partition number
*/
@Override
public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int numContainers) {
Expand Down Expand Up @@ -228,6 +233,11 @@ public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int num
}
}

// If size of mwuGroups is smaller than numContainers, try to further split the multi WU to respect the container number requirement
if(mwuGroups.size() < numContainers) {
mwuGroups = splitMultiWorkUnits(mwuGroups, numContainers);
}

List<WorkUnit> squeezedGroups = squeezeMultiWorkUnits(mwuGroups);
log.debug("Squeezed work unit groups: " + squeezedGroups);
return squeezedGroups;
Expand Down Expand Up @@ -381,4 +391,40 @@ static double getContainerCapacityForTopic(List<Double> capacities, ContainerCap
throw new RuntimeException("Unsupported computation strategy: " + strategy.name());
}
}

/**
* A method that split a list of {@link MultiWorkUnit} to the size of desiredWUSize if possible. The approach is to try
* to evenly split the {@link WorkUnit} within MWU into two, and always try to split MWU with more partitions first.
* Stop when each {@link MultiWorkUnit} only contains single {@link WorkUnit} as further split is no possible.
* @param multiWorkUnits the list of {@link MultiWorkUnit} to be split
* @param desiredWUSize desired number of {@link MultiWorkUnit}
* @return splitted MultiWorkUnit
*/
public static List<MultiWorkUnit> splitMultiWorkUnits(List<MultiWorkUnit> multiWorkUnits, int desiredWUSize) {
PriorityQueue<MultiWorkUnit> pQueue = new PriorityQueue<>(
Comparator.comparing(mwu -> mwu.getWorkUnits().size(), Comparator.reverseOrder()));
pQueue.addAll(multiWorkUnits);

while(pQueue.size() < desiredWUSize) {
MultiWorkUnit mwu = pQueue.poll();
int size = mwu.getWorkUnits().size();
// If the size is smaller than 2, meaning each mwu only contains a single wu and can't be further split.
// Add back the polled element and stop the loop.
if(size < 2) {
pQueue.add(mwu);
break;
}
// Split the mwu into 2 with evenly distributed wu
pQueue.add(MultiWorkUnit.createMultiWorkUnit(mwu.getWorkUnits().subList(0, size/2)));
pQueue.add(MultiWorkUnit.createMultiWorkUnit(mwu.getWorkUnits().subList(size/2, size)));
}

log.info("Min size of the container is set to {}, successfully split the multi workunit to {}", desiredWUSize, pQueue.size());

// If size is the same, meaning no split can be done. Return the original list to avoid construct a new list
if(multiWorkUnits.size() == pQueue.size()) {
return multiWorkUnits;
}
return new ArrayList<>(pQueue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -28,6 +29,7 @@
import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
import org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
Expand All @@ -51,7 +53,7 @@ public void setUp() {
}

/**
* Check that capacity is honored.
* Check that capacity is honored. Set numContainers to 0 so the workUnit list size is determined only by the capacity
*/
@Test
public void testSingleTopic() {
Expand All @@ -64,7 +66,7 @@ public void testSingleTopic() {
.newArrayList(getWorkUnitWithTopicPartition("topic1", 1), getWorkUnitWithTopicPartition("topic1", 2),
getWorkUnitWithTopicPartition("topic1", 3)));

List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 10);
List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 0);
Assert.assertEquals(workUnits.size(), 2);
Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME), "topic1");
Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 1);
Expand All @@ -91,7 +93,7 @@ public void testMultiTopic() {
.newArrayList(getWorkUnitWithTopicPartition("topic2", 1), getWorkUnitWithTopicPartition("topic2", 2),
getWorkUnitWithTopicPartition("topic2", 3)));

List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 10);
List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 0);
Assert.assertEquals(workUnits.size(), 4);

Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME), "topic1");
Expand All @@ -113,8 +115,49 @@ public void testMultiTopic() {
Assert.assertEquals(workUnits.get(3).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY), 2, 0.001);
}

@Test
public void testMultiTopicWithNumContainers() {
KafkaSource source = new UniversalKafkaSource();
SourceState state = new SourceState(new State(props));
state.setProp("gobblin.kafka.streaming.enableIndexing", true);
state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, Files.createTempDir().getAbsolutePath());

Map<String, List<WorkUnit>> workUnitsByTopic = ImmutableMap.of(
"topic1", Lists.newArrayList(getWorkUnitWithTopicPartition("topic1", 1),
getWorkUnitWithTopicPartition("topic1", 2)),
"topic2", Lists.newArrayList(getWorkUnitWithTopicPartition("topic2", 1),
getWorkUnitWithTopicPartition("topic2", 2),
getWorkUnitWithTopicPartition("topic2", 3),
getWorkUnitWithTopicPartition("topic2", 4)));
KafkaTopicGroupingWorkUnitPacker packer = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent());
List<WorkUnit> workUnits = packer.pack(workUnitsByTopic, 5);
Assert.assertEquals(workUnits.size(), 5);

int partitionCount = 0;
for(WorkUnit workUnit : workUnits) {
partitionCount += KafkaUtils.getPartitions(workUnit).size();
}
Assert.assertEquals(partitionCount, 6);

workUnitsByTopic = ImmutableMap.of(
"topic1", Lists.newArrayList(getWorkUnitWithTopicPartition("topic1", 1),
getWorkUnitWithTopicPartition("topic1", 2)),
"topic2", Lists.newArrayList(getWorkUnitWithTopicPartition("topic2", 1),
getWorkUnitWithTopicPartition("topic2", 2),
getWorkUnitWithTopicPartition("topic2", 3),
getWorkUnitWithTopicPartition("topic2", 4)));
workUnits = packer.pack(workUnitsByTopic, 7);
// Total WU size wouldn't be more than 6
Assert.assertEquals(workUnits.size(), 6);
partitionCount = 0;
for(WorkUnit workUnit : workUnits) {
partitionCount += KafkaUtils.getPartitions(workUnit).size();
}
Assert.assertEquals(partitionCount, 6);
}

public WorkUnit getWorkUnitWithTopicPartition(String topic, int partition) {

public WorkUnit getWorkUnitWithTopicPartition(String topic, int partition) {
WorkUnit workUnit = new WorkUnit(new Extract(Extract.TableType.APPEND_ONLY, "kafka", topic));
workUnit.setProp(KafkaSource.TOPIC_NAME, topic);
workUnit.setProp(KafkaSource.PARTITION_ID, Integer.toString(partition));
Expand Down Expand Up @@ -159,4 +202,33 @@ public void testGetContainerCapacityForTopic() {
capacity = KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities, strategy);
Assert.assertEquals(capacity, 1.35, delta);
}

@Test
public void testSplitMultiWorkUnits() {
// Create a list of 2 MWU, each contains 3 WU within
List<MultiWorkUnit> multiWorkUnitList = new ArrayList<>();
for (int i = 0; i < 2; i++) {
MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty();
for (int j = 0; j < 3; j++) {
multiWorkUnit.addWorkUnit(WorkUnit.createEmpty());

}
multiWorkUnitList.add(multiWorkUnit);
}

// minWUSize is smaller than MWU size, so the result should remain the size of list of MWU
List<MultiWorkUnit> mwuList = KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 1);
Assert.assertEquals(mwuList.size(), 2);

mwuList = KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 3);
Assert.assertEquals(mwuList.size(), 3);

mwuList = KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 6);
Assert.assertEquals(mwuList.size(), 6);

// minWUSize is bigger than number combining of all WU, so the result will be the sum of all WU
mwuList = KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 7);
Assert.assertEquals(mwuList.size(), 6);
}

}