Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
Expand All @@ -38,6 +39,7 @@
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;

import io.opentracing.Scope;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.thirdparty.io.grpc.BindableService;
import org.apache.ratis.thirdparty.io.grpc.Server;
import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
Expand Down Expand Up @@ -70,6 +72,8 @@ public final class XceiverServerGrpc extends XceiverServer {
private Server server;
private final ContainerDispatcher storageContainer;
private boolean isStarted;
private DatanodeDetails datanodeDetails;


/**
* Constructs a Grpc server class.
Expand All @@ -83,25 +87,15 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
Preconditions.checkNotNull(conf);

this.id = datanodeDetails.getUuid();
this.datanodeDetails = datanodeDetails;
this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
// Get an available port on current node and
// use that as the container port

if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
try (ServerSocket socket = new ServerSocket()) {
socket.setReuseAddress(true);
SocketAddress address = new InetSocketAddress(0);
socket.bind(address);
this.port = socket.getLocalPort();
LOG.info("Found a free port for the server : {}", this.port);
} catch (IOException e) {
LOG.error("Unable find a random free port for the server, "
+ "fallback to use default port {}", this.port, e);
}
this.port = 0;
}
datanodeDetails.setPort(
DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port));

NettyServerBuilder nettyServerBuilder =
((NettyServerBuilder) ServerBuilder.forPort(port))
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
Expand Down Expand Up @@ -164,6 +158,19 @@ public HddsProtos.ReplicationType getServerType() {
public void start() throws IOException {
if (!isStarted) {
server.start();
int realPort = server.getPort();

if (port == 0) {
LOG.info("{} {} is started using port {}", getClass().getSimpleName(),
this.id, realPort);
port = realPort;
}

//register the real port to the datanode details.
datanodeDetails.setPort(DatanodeDetails
.newPort(Name.STANDALONE,
realPort));

isStarted = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -101,7 +102,7 @@ private static long nextCallId() {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
}

private final int port;
private int port;
private final RaftServer server;
private ThreadPoolExecutor chunkExecutor;
private final List<ExecutorService> executors;
Expand All @@ -112,13 +113,15 @@ private static long nextCallId() {
private long nodeFailureTimeoutMs;
private final long cacheEntryExpiryInteval;
private boolean isStarted = false;
private DatanodeDetails datanodeDetails;

private XceiverServerRatis(DatanodeDetails dd, int port,
ContainerDispatcher dispatcher, Configuration conf, StateContext
context, GrpcTlsConfig tlsConfig, CertificateClient caClient)
throws IOException {
super(conf, caClient);
Objects.requireNonNull(dd, "id == null");
datanodeDetails = dd;
this.port = port;
RaftProperties serverProperties = newRaftProperties(conf);
final int numWriteChunkThreads = conf.getInt(
Expand Down Expand Up @@ -403,21 +406,11 @@ public static XceiverServerRatis newXceiverServerRatis(
if (ozoneConf.getBoolean(OzoneConfigKeys
.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) {
try (ServerSocket socket = new ServerSocket()) {
socket.setReuseAddress(true);
SocketAddress address = new InetSocketAddress(0);
socket.bind(address);
localPort = socket.getLocalPort();
LOG.info("Found a free port for the server : {}", localPort);
} catch (IOException e) {
LOG.error("Unable find a random free port for the server, "
+ "fallback to use default port {}", localPort, e);
}
localPort = 0;
}
GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfig(
new SecurityConfig(ozoneConf));
datanodeDetails.setPort(
DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));

return new XceiverServerRatis(datanodeDetails, localPort,
dispatcher, ozoneConf, context, tlsConfig, caClient);
}
Expand All @@ -429,6 +422,22 @@ public void start() throws IOException {
server.getId(), getIPCPort());
chunkExecutor.prestartAllCoreThreads();
server.start();

int realPort =
((RaftServerProxy) server).getServerRpc().getInetSocketAddress()
.getPort();

if (port == 0) {
LOG.info("{} {} is started using port {}", getClass().getSimpleName(),
server.getId(), realPort);
port = realPort;
}

//register the real port to the datanode details.
datanodeDetails.setPort(DatanodeDetails
.newPort(DatanodeDetails.Port.Name.RATIS,
realPort));

isStarted = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -194,25 +195,50 @@ public void testContainerRandomPort() throws IOException {
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true);
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
true);
try (
DatanodeStateMachine sm1 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm2 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm3 = new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null, null)
) {
List<DatanodeStateMachine> stateMachines = new ArrayList<>();
try {

for (int i = 0; i < 3; i++) {
stateMachines.add(new DatanodeStateMachine(
TestUtils.randomDatanodeDetails(), ozoneConf, null, null));
}

//we need to start all the servers to get the fix ports
for (DatanodeStateMachine dsm : stateMachines) {
dsm.getContainer().getReadChannel().start();
dsm.getContainer().getWriteChannel().start();

}

for (DatanodeStateMachine dsm : stateMachines) {
dsm.getContainer().getWriteChannel().stop();
dsm.getContainer().getReadChannel().stop();

}

//after the start the real port numbers should be available AND unique
HashSet<Integer> ports = new HashSet<Integer>();
assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
assertTrue(ports.add(sm2.getContainer().getReadChannel().getIPCPort()));
assertTrue(ports.add(sm3.getContainer().getReadChannel().getIPCPort()));
for (DatanodeStateMachine dsm : stateMachines) {
int readPort = dsm.getContainer().getReadChannel().getIPCPort();

assertNotEquals("Port number of the service is not updated", 0,
readPort);

// Assert that ratis is also on a different port.
assertTrue(ports.add(sm1.getContainer().getWriteChannel().getIPCPort()));
assertTrue(ports.add(sm2.getContainer().getWriteChannel().getIPCPort()));
assertTrue(ports.add(sm3.getContainer().getWriteChannel().getIPCPort()));
assertTrue("Port of datanode service is conflicted with other server.",
ports.add(readPort));

int writePort = dsm.getContainer().getWriteChannel().getIPCPort();

assertNotEquals("Port number of the service is not updated", 0,
writePort);
assertTrue("Port of datanode service is conflicted with other server.",
ports.add(writePort));
}

} finally {
for (DatanodeStateMachine dsm : stateMachines) {
dsm.close();
}
}

// Turn off the random port flag and test again
Expand Down