Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
7d3ccfe
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 2, 2024
c9e88a2
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 2, 2024
9fd9d99
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 2, 2024
20f4470
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 2, 2024
4d16aa7
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 2, 2024
71588eb
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 2, 2024
58741b5
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 2, 2024
6017207
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 2, 2024
29400e5
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 2, 2024
164f1c0
Merge pull request #2 from Daniilchik/HDDS-3498-1
Daniilchik Oct 2, 2024
83fbbbe
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 2, 2024
4a46f4d
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 3, 2024
4b4a582
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 3, 2024
6253cb1
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 3, 2024
d9536ab
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 3, 2024
3bb033d
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 4, 2024
0ac758e
HDDS-3498. Shutdown datanode if address is already in use
Daniilchik Oct 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,17 @@ public void logIfNeeded(Exception ex) {
}

if (missCounter == 0) {
LOG.warn(
"Unable to communicate to {} server at {} for past {} seconds.",
serverName,
getAddress().getHostString() + ":" + getAddress().getPort(),
TimeUnit.MILLISECONDS.toSeconds(this.getMissedCount() *
getScmHeartbeatInterval(this.conf)), ex);
long missedDurationSeconds = TimeUnit.MILLISECONDS.toSeconds(
this.getMissedCount() * getScmHeartbeatInterval(this.conf)
);
LOG.error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you changed it to error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the remark. I’ll change it back. I initially changed it at the beginning of working on the task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"Unable to communicate to {} server at {}:{} for past {} seconds.",
serverName,
address.getAddress(),
address.getPort(),
missedDurationSeconds,
ex
);
}

if (LOG.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.ozoneimpl.BindException;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
Expand Down Expand Up @@ -104,7 +105,7 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
LOG.debug("Cannot execute GetVersion task as endpoint state machine " +
"is in {} state", rpcEndPoint.getState());
}
} catch (DiskOutOfSpaceException ex) {
} catch (DiskOutOfSpaceException | BindException ex) {
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
} catch (IOException ex) {
rpcEndPoint.logIfNeeded(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.ozoneimpl.BindException;
import org.apache.ratis.thirdparty.io.grpc.Server;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
Expand Down Expand Up @@ -185,7 +186,16 @@ public HddsProtos.ReplicationType getServerType() {
@Override
public void start() throws IOException {
if (!isStarted) {
server.start();
try {
server.start();
} catch (IOException e) {
LOG.error("Error while starting the server", e);
if (e.getMessage().contains("Failed to bind to address")) {
Copy link
Contributor

@ivanzlenko ivanzlenko Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any actual reason to start a service if we can't start a server? Shouldn't we just handle all IOException like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we will handle all IOException like that we will end up with shutting down datanodes due to any network failures.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we identify how actual retryable exceptions looks like? Is netty throwing specifically only IOExceptions here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it is better just handle correctly server.start part and shutdown everything here cause we can't start server. There is no point to live afterwards.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed it with Daniil - we need to return to this code at some point in time to refactor it.

throw new BindException(e);
} else {
throw e;
}
}
int realPort = server.getPort();

if (port == 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ozoneimpl;

import java.io.IOException;

/**
* Exception used to indicate a problem with binding a port.
* Typically, the port is in use.
*/
public class BindException extends IOException {
public BindException() {
}

public BindException(String message) {
super(message);
}

public BindException(String message, Throwable cause) {
super(message, cause);
}

public BindException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public void testGetVersionToInvalidEndpoint() throws Exception {
.getReuseableAddress();
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
nonExistentServerAddress, 1000)) {
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
DatanodeDetails datanodeDetails = randomDatanodeDetails();
OzoneContainer ozoneContainer = new OzoneContainer(datanodeDetails,
conf, ContainerTestUtils.getMockContext(datanodeDetails, ozoneConf));
Expand All @@ -336,7 +336,7 @@ public void testGetVersionToInvalidEndpoint() throws Exception {

// This version call did NOT work, so endpoint should remain in the same
// state.
assertEquals(EndpointStateMachine.EndPointStates.GETVERSION, newState);
assertEquals(EndpointStateMachine.EndPointStates.SHUTDOWN, newState);
}
}

Expand All @@ -353,7 +353,7 @@ public void testGetVersionAssertRpcTimeOut() throws Exception {

try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
serverAddress, (int) rpcTimeout)) {
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
DatanodeDetails datanodeDetails = randomDatanodeDetails();
OzoneContainer ozoneContainer = new OzoneContainer(datanodeDetails, conf,
ContainerTestUtils.getMockContext(datanodeDetails, ozoneConf));
Expand All @@ -366,7 +366,7 @@ public void testGetVersionAssertRpcTimeOut() throws Exception {
long end = Time.monotonicNow();
scmServerImpl.setRpcResponseDelay(0);
assertThat(end - start).isLessThanOrEqualTo(rpcTimeout + tolerance);
assertEquals(EndpointStateMachine.EndPointStates.GETVERSION, newState);
assertEquals(EndpointStateMachine.EndPointStates.SHUTDOWN, newState);
}
}

Expand Down