From 35697ad79ebba07d854d2fa578a4ba16d98601b6 Mon Sep 17 00:00:00 2001 From: okumin Date: Mon, 7 Aug 2023 17:15:17 +0900 Subject: [PATCH] TEZ-4508: Allow the FAIR_PARALLELISM mode to accept multiple source vertices --- .../FairShuffleVertexManager.java | 16 ------------- .../TestFairShuffleVertexManager.java | 24 +++++++++---------- 2 files changed, 11 insertions(+), 29 deletions(-) diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.java index af4e5b8b26..e28bce935d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.java @@ -144,10 +144,6 @@ public boolean reduceParallelismEnabled() { return equals(FairRoutingType.REDUCE_PARALLELISM); } - public boolean fairParallelismEnabled() { - return equals(FairRoutingType.FAIR_PARALLELISM); - } - public boolean enabled() { return !equals(FairRoutingType.NONE); } @@ -192,18 +188,6 @@ public FairShuffleVertexManager(VertexManagerPluginContext context) { super(context); } - @Override - protected void onVertexStartedCheck() { - super.onVertexStartedCheck(); - if (bipartiteSources > 1 && - (mgrConfig.getFairRoutingType().fairParallelismEnabled())) { - // TODO TEZ-3500 - throw new TezUncheckedException( - "Having more than one destination task process same partition(s) " + - "only works with one bipartite source."); - } - } - static long ceil(long a, long b) { return (a + (b - 1)) / b; } diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java index 5108b8f9e6..d87259b45c 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java @@ -27,11 +27,11 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; -import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.library.vertexmanager.FairShuffleVertexManager.FairRoutingType; +import org.apache.tez.dag.library.vertexmanager.FairShuffleVertexManager.FairShuffleVertexManagerConfig; import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.junit.Assert; @@ -86,7 +86,7 @@ public void testAutoParallelismConfig() throws Exception { } @Test(timeout = 5000) - public void testInvalidSetup() { + public void testFairAutoParallelismConfig() { Configuration conf = new Configuration(); ShuffleVertexManagerBase manager; @@ -96,17 +96,15 @@ public void testInvalidSetup() { "Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, scheduledTasks, null); - // fail if there are more than one bipartite for FAIR_PARALLELISM - try { - manager = createFairShuffleVertexManager(conf, mockContext, - FairRoutingType.FAIR_PARALLELISM, 1000 * MB, 0.001f, 0.001f); - manager.onVertexStarted(emptyCompletions); - Assert.assertFalse(true); - } catch (TezUncheckedException e) { - Assert.assertTrue(e.getMessage().contains( - "Having more than one destination task process same partition(s) " + - "only works with one bipartite source.")); - } + manager = createFairShuffleVertexManager(conf, mockContext, + FairRoutingType.FAIR_PARALLELISM, 1000 * MB, null, 0.5f); + verify(mockContext, times(1)).vertexReconfigurationPlanned(); // Tez notified of reconfig + FairShuffleVertexManagerConfig config = (FairShuffleVertexManagerConfig) manager.config; + Assert.assertTrue(config.isAutoParallelismEnabled()); + Assert.assertTrue(config.fairRoutingType == FairRoutingType.FAIR_PARALLELISM); + Assert.assertTrue(config.getDesiredTaskInputDataSize() == 1000l * MB); + Assert.assertTrue(config.getMinFraction() == 0.25f); + Assert.assertTrue(config.getMaxFraction() == 0.5f); } @Test(timeout = 5000)