Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,6 @@ public boolean reduceParallelismEnabled() {
return equals(FairRoutingType.REDUCE_PARALLELISM);
}

public boolean fairParallelismEnabled() {
return equals(FairRoutingType.FAIR_PARALLELISM);
}

public boolean enabled() {
return !equals(FairRoutingType.NONE);
}
Expand Down Expand Up @@ -192,18 +188,6 @@ public FairShuffleVertexManager(VertexManagerPluginContext context) {
super(context);
}

@Override
protected void onVertexStartedCheck() {
super.onVertexStartedCheck();
if (bipartiteSources > 1 &&
(mgrConfig.getFairRoutingType().fairParallelismEnabled())) {
// TODO TEZ-3500
throw new TezUncheckedException(
"Having more than one destination task process same partition(s) " +
"only works with one bipartite source.");
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect this assertion is not must because Tez users currently use FairShuffleVertexManager in specific cases on purpose.
I guess we need to add a new DataMovementType or new FairShuffleVertexManager for JOIN, but we don't immediately need it.

}

static long ceil(long a, long b) {
return (a + (b - 1)) / b;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.library.vertexmanager.FairShuffleVertexManager.FairRoutingType;
import org.apache.tez.dag.library.vertexmanager.FairShuffleVertexManager.FairShuffleVertexManagerConfig;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.junit.Assert;
Expand Down Expand Up @@ -86,7 +86,7 @@ public void testAutoParallelismConfig() throws Exception {
}

@Test(timeout = 5000)
public void testInvalidSetup() {
public void testFairAutoParallelismConfig() {
Configuration conf = new Configuration();
ShuffleVertexManagerBase manager;

Expand All @@ -96,17 +96,15 @@ public void testInvalidSetup() {
"Vertex1", 2, "Vertex2", 2, "Vertex3", 2,
"Vertex4", 4, scheduledTasks, null);

// fail if there are more than one bipartite for FAIR_PARALLELISM
try {
manager = createFairShuffleVertexManager(conf, mockContext,
FairRoutingType.FAIR_PARALLELISM, 1000 * MB, 0.001f, 0.001f);
manager.onVertexStarted(emptyCompletions);
Assert.assertFalse(true);
} catch (TezUncheckedException e) {
Assert.assertTrue(e.getMessage().contains(
"Having more than one destination task process same partition(s) " +
"only works with one bipartite source."));
}
manager = createFairShuffleVertexManager(conf, mockContext,
FairRoutingType.FAIR_PARALLELISM, 1000 * MB, null, 0.5f);
verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez notified of reconfig
FairShuffleVertexManagerConfig config = (FairShuffleVertexManagerConfig) manager.config;
Assert.assertTrue(config.isAutoParallelismEnabled());
Assert.assertTrue(config.fairRoutingType == FairRoutingType.FAIR_PARALLELISM);
Assert.assertTrue(config.getDesiredTaskInputDataSize() == 1000l * MB);
Assert.assertTrue(config.getMinFraction() == 0.25f);
Assert.assertTrue(config.getMaxFraction() == 0.5f);
}

@Test(timeout = 5000)
Expand Down