Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hdds;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ServiceException;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -709,21 +710,26 @@ public static int roundupMb(long bytes) {
* or a RpcException.
*/
public static Throwable getUnwrappedException(Exception ex) {
Throwable t = ex;
if (ex instanceof ServiceException) {
Throwable t = ex.getCause();
if (t instanceof RemoteException) {
t = ((RemoteException) t).unwrapRemoteException();
}
while (t != null) {
if (t instanceof RpcException ||
t instanceof AccessControlException ||
t instanceof SecretManager.InvalidToken) {
return t;
}
t = t.getCause();
t = ex.getCause();
}
if (t instanceof RemoteException) {
t = ((RemoteException) t).unwrapRemoteException();
}
while (t != null) {
if (t instanceof RpcException ||
t instanceof AccessControlException ||
t instanceof SecretManager.InvalidToken) {
break;
}
Throwable cause = t.getCause();
if (cause == null || cause instanceof RemoteException) {
break;
}
t = cause;
}
return null;
return t;
}

/**
Expand All @@ -743,7 +749,7 @@ public static boolean shouldNotFailoverOnRpcException(Throwable exception) {
return true;
}
}
return false;
return exception instanceof InvalidProtocolBufferException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ public enum ClientVersion implements ComponentVersion {
"This client version has support for Object Store and File " +
"System Optimized Bucket Layouts."),

EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST(4,
"This client version enforces replica index is set for fixing read corruption that could occur when " +
"replicaIndex parameter is not validated before EC block reads."),

FUTURE_VERSION(-1, "Used internally when the server side is older and an"
+ " unknown client version has arrived from the client.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@
import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerDataProto.State.RECOVERING;
import static org.apache.hadoop.ozone.ClientVersion.EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST;
import static org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;

import org.apache.ratis.statemachine.StateMachine;
Expand Down Expand Up @@ -563,15 +562,6 @@ ContainerCommandResponseProto handlePutBlock(
return putBlockResponseSuccess(request, blockDataProto);
}

/**
* Checks if a replicaIndex needs to be checked based on the client version for a request.
* @param request ContainerCommandRequest object.
* @return true if the validation is required for the client version else false.
*/
private boolean replicaIndexCheckRequired(ContainerCommandRequestProto request) {
return request.hasVersion() && request.getVersion() >= EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST.toProtoValue();
}

/**
* Handle Get Block operation. Calls BlockManager to process the request.
*/
Expand All @@ -590,9 +580,7 @@ ContainerCommandResponseProto handleGetBlock(
try {
BlockID blockID = BlockID.getFromProtobuf(
request.getGetBlock().getBlockID());
if (replicaIndexCheckRequired(request)) {
BlockUtils.verifyReplicaIdx(kvContainer, blockID);
}
BlockUtils.verifyReplicaIdx(kvContainer, blockID);
responseData = blockManager.getBlock(kvContainer, blockID).getProtoBufMessage();
final long numBytes = responseData.getSerializedSize();
metrics.incContainerBytesStats(Type.GetBlock, numBytes);
Expand Down Expand Up @@ -709,9 +697,7 @@ ContainerCommandResponseProto handleReadChunk(
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getReadChunk()
.getChunkData());
Preconditions.checkNotNull(chunkInfo);
if (replicaIndexCheckRequired(request)) {
BlockUtils.verifyReplicaIdx(kvContainer, blockID);
}
BlockUtils.verifyReplicaIdx(kvContainer, blockID);
BlockUtils.verifyBCSId(kvContainer, blockID);

if (dispatcherContext == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ public static void verifyBCSId(Container container, BlockID blockID)
public static void verifyReplicaIdx(Container container, BlockID blockID)
throws IOException {
Integer containerReplicaIndex = container.getContainerData().getReplicaIndex();
if (containerReplicaIndex > 0 && !containerReplicaIndex.equals(blockID.getReplicaIndex())) {
Integer blockReplicaIndex = blockID.getReplicaIndex();
if (containerReplicaIndex > 0 && blockReplicaIndex != null && blockReplicaIndex != 0 &&
!containerReplicaIndex.equals(blockReplicaIndex)) {
throw new StorageContainerException(
"Unable to find the Container with replicaIdx " + blockID.getReplicaIndex() + ". Container "
+ container.getContainerData().getContainerID() + " replicaIdx is "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ public void testGetBlockWithReplicaIndexMismatch(ClientVersion clientVersion, in
handler.handleGetBlock(
getDummyCommandRequestProto(clientVersion, ContainerProtos.Type.GetBlock, rid),
container);
assertEquals((replicaIndex > 0 && rid != replicaIndex && clientVersion.toProtoValue() >=
ClientVersion.EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST.toProtoValue()) ?
assertEquals((replicaIndex > 0 && rid != 0 && rid != replicaIndex) ?
ContainerProtos.Result.CONTAINER_NOT_FOUND : UNKNOWN_BCSID,
response.getResult());
}
Expand Down Expand Up @@ -158,8 +157,7 @@ public void testReadChunkWithReplicaIndexMismatch(ClientVersion clientVersion, i
ContainerProtos.ContainerCommandResponseProto response =
handler.handleReadChunk(getDummyCommandRequestProto(clientVersion, ContainerProtos.Type.ReadChunk, rid),
container, null);
assertEquals((replicaIndex > 0 && rid != replicaIndex &&
clientVersion.toProtoValue() >= ClientVersion.EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST.toProtoValue()) ?
assertEquals((replicaIndex > 0 && rid != 0 && rid != replicaIndex) ?
ContainerProtos.Result.CONTAINER_NOT_FOUND : UNKNOWN_BCSID,
response.getResult());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.apache.hadoop.ozone.freon.FreonReplicationOptions;
import org.apache.hadoop.ozone.loadgenerators.LoadGenerator;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
Expand Down Expand Up @@ -117,7 +117,7 @@ enum AllowedBucketLayouts { FILE_SYSTEM_OPTIMIZED, OBJECT_STORE }
private static final String OM_SERVICE_ID = "ozoneChaosTest";
private static final String SCM_SERVICE_ID = "scmChaosTest";

@BeforeClass
@BeforeAll
public static void init() throws Exception {
OzoneConfiguration configuration = new OzoneConfiguration();

Expand Down Expand Up @@ -191,7 +191,7 @@ static void setNumManagers(int nOms, int numScms, boolean enableHA) {
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
@AfterAll
public static void shutdown() {
if (loadGenerator != null) {
loadGenerator.shutdownLoadGenerator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,33 +56,26 @@

import static java.util.Collections.singletonList;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.apache.ozone.test.JUnit5AwareTimeout;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/**
* Class to test CommitWatcher functionality.
*/
@Timeout(value = 300, unit = TimeUnit.SECONDS)
public class TestCommitWatcher {

/**
* Set a timeout for each test.
*/
@Rule
public TestRule timeout = new JUnit5AwareTimeout(Timeout.seconds(300));
private MiniOzoneCluster cluster;
private OzoneConfiguration conf = new OzoneConfiguration();
private OzoneClient client;
Expand All @@ -104,7 +97,7 @@ public class TestCommitWatcher {
*
* @throws IOException
*/
@Before
@BeforeEach
public void init() throws Exception {
chunkSize = (int)(1 * OzoneConsts.MB);
flushSize = (long) 2 * chunkSize;
Expand Down Expand Up @@ -165,7 +158,7 @@ public void init() throws Exception {
/**
* Shutdown MiniDFSCluster.
*/
@After
@AfterEach
public void shutdown() {
IOUtils.closeQuietly(client);
if (cluster != null) {
Expand Down Expand Up @@ -324,11 +317,12 @@ public void testReleaseBuffersOnException() throws Exception {
// can itself get AlreadyClosedException from the Ratis Server
// and the write may fail with RaftRetryFailureException
Throwable t = HddsClientUtils.checkForException(ioe);
assertTrue("Unexpected exception: " + t.getClass(),
assertTrue(
t instanceof RaftRetryFailureException ||
t instanceof TimeoutIOException ||
t instanceof AlreadyClosedException ||
t instanceof NotReplicatedException);
t instanceof NotReplicatedException,
"Unexpected exception: " + t.getClass());
}
if (ratisClient.getReplicatedMinCommitIndex() < replies.get(1)
.getLogIndex()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public void testCreateRecoveryContainer() throws Exception {
int replicaIndex = 4;
XceiverClientSpi dnClient = xceiverClientManager.acquireClient(
createSingleNodePipeline(newPipeline, newPipeline.getNodes().get(0),
replicaIndex));
2));
try {
// To create the actual situation, container would have been in closed
// state at SCM.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,10 @@
import org.apache.hadoop.security.token.Token;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.util.ExitUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.apache.ozone.test.JUnit5AwareTimeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -65,6 +62,7 @@
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

Expand Down Expand Up @@ -93,14 +91,15 @@
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.apache.ozone.test.GenericTestUtils.assertExceptionContains;
import static org.apache.ozone.test.GenericTestUtils.waitFor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Integration test to verify block tokens in a secure cluster.
*/
@InterfaceAudience.Private
@Timeout(value = 180, unit = TimeUnit.SECONDS)
public final class TestBlockTokens {
private static final Logger LOG = LoggerFactory
.getLogger(TestBlockTokens.class);
Expand All @@ -111,9 +110,6 @@ public final class TestBlockTokens {
private static final int EXPIRY_DURATION_IN_MS = 10000;
private static final int ROTATION_CHECK_DURATION_IN_MS = 100;

@Rule
public TestRule timeout = new JUnit5AwareTimeout(Timeout.seconds(180));

private static MiniKdc miniKdc;
private static OzoneConfiguration conf;
private static File workDir;
Expand All @@ -129,7 +125,7 @@ public final class TestBlockTokens {
private static BlockInputStreamFactory blockInputStreamFactory =
new BlockInputStreamFactoryImpl();

@BeforeClass
@BeforeAll
public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
Expand Down Expand Up @@ -161,7 +157,7 @@ private static void createTestData() throws IOException {
}
}

@AfterClass
@AfterAll
public static void stop() {
miniKdc.stop();
IOUtils.close(LOG, client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.util.ExitUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.apache.ozone.test.JUnit5AwareTimeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,6 +48,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static java.time.Duration.between;
Expand All @@ -78,19 +71,20 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.Assertions;
/**
* Integration test class to verify block token CLI commands functionality in a
* secure cluster.
*/
@InterfaceAudience.Private
@Timeout(value = 180, unit = TimeUnit.SECONDS)
public final class TestBlockTokensCLI {
private static final Logger LOG = LoggerFactory
.getLogger(TestBlockTokensCLI.class);

@Rule
public TestRule timeout = new JUnit5AwareTimeout(Timeout.seconds(180));

private static MiniKdc miniKdc;
private static OzoneAdmin ozoneAdmin;
private static OzoneConfiguration conf;
Expand All @@ -105,7 +99,7 @@ public final class TestBlockTokensCLI {
private static MiniOzoneHAClusterImpl cluster;
private static OzoneClient client;

@BeforeClass
@BeforeAll
public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
Expand All @@ -128,7 +122,7 @@ public static void init() throws Exception {
ozoneAdmin = new OzoneAdmin(conf);
}

@AfterClass
@AfterAll
public static void stop() {
miniKdc.stop();
IOUtils.close(LOG, client);
Expand Down
Loading
Loading