diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java index bf8d6f102565..387219cf12f4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java @@ -18,8 +18,11 @@ package org.apache.hadoop.ozone.container.replication; import java.io.IOException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigType; @@ -35,6 +38,13 @@ import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors; import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.ratis.thirdparty.io.netty.channel.ServerChannel; +import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll; +import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup; +import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollServerSocketChannel; +import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder; import org.slf4j.Logger; @@ -61,22 +71,52 @@ public class ReplicationServer { private int port; + private int poolSize; + + private int queueLimit; + + private ThreadPoolExecutor executor; + + private EventLoopGroup eventLoopGroup; + public ReplicationServer( ContainerController controller, ReplicationConfig replicationConfig, SecurityConfig secConf, - CertificateClient caClient - ) { + CertificateClient caClient) { this.secConf = secConf; this.caClient = caClient; this.controller = controller; this.port = replicationConfig.getPort(); + this.poolSize = replicationConfig.getReplicationMaxStreams(); + this.queueLimit = replicationConfig.getReplicationQueueLimit(); init(); } public void init() { + executor = new ThreadPoolExecutor( + poolSize, poolSize, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(queueLimit), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("ReplicationServerExecutor-%d") + .build()); + + Class channelType; + + if (Epoll.isAvailable()) { + eventLoopGroup = new EpollEventLoopGroup(poolSize / 4); + channelType = EpollServerSocketChannel.class; + } else { + eventLoopGroup = new NioEventLoopGroup(poolSize / 4); + channelType = NioServerSocketChannel.class; + } + NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port) .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) + .workerEventLoopGroup(eventLoopGroup) + .bossEventLoopGroup(eventLoopGroup) + .channelType(channelType) + .executor(executor) .addService(ServerInterceptors.intercept(new GrpcReplicationService( new OnDemandContainerReplicationSource(controller) ), new GrpcServerInterceptor())); @@ -113,12 +153,14 @@ public void start() throws IOException { } port = server.getPort(); - } public void stop() { try { - server.shutdown().awaitTermination(10L, TimeUnit.SECONDS); + eventLoopGroup.shutdownGracefully().sync(); + executor.shutdown(); + executor.awaitTermination(5L, TimeUnit.SECONDS); + server.shutdown().awaitTermination(5L, TimeUnit.SECONDS); } catch (InterruptedException ex) { LOG.warn("{} couldn't be stopped gracefully", getClass().getSimpleName()); Thread.currentThread().interrupt(); @@ -141,7 +183,7 @@ public static final class ReplicationConfig { public static final String REPLICATION_STREAMS_LIMIT_KEY = PREFIX + "." + STREAMS_LIMIT_KEY; - public static final int REPLICATION_MAX_STREAMS_DEFAULT = 10; + public static final int REPLICATION_MAX_STREAMS_DEFAULT = 12; /** * The maximum number of replication commands a single datanode can execute @@ -149,13 +191,25 @@ public static final class ReplicationConfig { */ @Config(key = STREAMS_LIMIT_KEY, type = ConfigType.INT, - defaultValue = "10", + defaultValue = "12", tags = {DATANODE}, description = "The maximum number of replication commands a single " + "datanode can execute simultaneously" ) private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT; + /** + * The maximum number of replication requests. + */ + @Config(key = "queue.limit", + type = ConfigType.INT, + defaultValue = "4096", + tags = {DATANODE}, + description = "The maximum number of queued requests for container " + + "replication" + ) + private int replicationQueueLimit = 4096; + @Config(key = "port", defaultValue = "9886", description = "Port used for the server2server replication server", tags = {DATANODE, MANAGEMENT}) @@ -178,6 +232,14 @@ public void setReplicationMaxStreams(int replicationMaxStreams) { this.replicationMaxStreams = replicationMaxStreams; } + public int getReplicationQueueLimit() { + return replicationQueueLimit; + } + + public void setReplicationQueueLimit(int limit) { + this.replicationQueueLimit = limit; + } + @PostConstruct public void validate() { if (replicationMaxStreams < 1) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationServer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationServer.java new file mode 100644 index 000000000000..697c8a9784fc --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationServer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.replication; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +/** + * Test the replication server. + */ +public class TestReplicationServer { + + @Test + public void testEpollDisabled() { + OzoneConfiguration config = new OzoneConfiguration(); + ReplicationServer.ReplicationConfig replicationConfig = + config.getObject(ReplicationServer.ReplicationConfig.class); + SecurityConfig secConf = new SecurityConfig(config); + System.setProperty( + "org.apache.ratis.thirdparty.io.netty.transport.noNative", "true"); + ReplicationServer server = new ReplicationServer(null, replicationConfig, + secConf, null); + + try { + server.start(); + } catch (IOException e) { + Assert.fail("Replication Server start should succeed."); + e.printStackTrace(); + } finally { + server.stop(); + } + } +}