diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index fb28a0f4b1..a54a7cfe01 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -464,6 +464,10 @@ static Token deserializeServiceData(ByteBuffer secret) throw return jt; } + public int getPort() { + return port; + } + @Override public void initializeApplication(ApplicationInitializationContext context) { @@ -537,7 +541,7 @@ public Thread newThread(Runnable r) { return new Thread(r, WORKER_THREAD_NAME_PREFIX + workerThreadCounter.incrementAndGet()); } }); - + port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); super.serviceInit(new YarnConfiguration(conf)); } @@ -556,7 +560,6 @@ protected void serviceStart() throws Exception { conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE, DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE)) .childOption(ChannelOption.SO_KEEPALIVE, true); initPipeline(bootstrap, conf); - port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); Channel ch = bootstrap.bind().sync().channel(); accepted.add(ch); diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java index 21addd393e..6a2e1cc68e 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java @@ -42,6 +42,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.Checksum; @@ -1432,4 +1433,36 @@ public FullHttpRequest createHttpRequest() { } return new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri); } + + @Test + public void testConfigPortStatic() throws Exception { + Random rand = new Random(); + int port = rand.nextInt(10) + 50000; + Configuration conf = new Configuration(); + // provide a port for ShuffleHandler + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, port); + MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2(); + shuffleHandler.serviceInit(conf); + try { + shuffleHandler.serviceStart(); + Assert.assertEquals(port, shuffleHandler.getPort()); + } finally { + shuffleHandler.stop(); + } + } + + @Test + public void testConfigPortDynamic() throws Exception { + Configuration conf = new Configuration(); + // 0 as config, should be dynamically chosen by netty + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2(); + shuffleHandler.serviceInit(conf); + try { + shuffleHandler.serviceStart(); + Assert.assertTrue("ShuffleHandler should use a random chosen port", shuffleHandler.getPort() > 0); + } finally { + shuffleHandler.stop(); + } + } }