diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 021d2fc30905..3042b4d847a0 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -201,6 +201,13 @@ public enum ChecksumCombineMode { // 3 concurrent stripe read should be enough. private int ecReconstructStripeReadPoolLimit = 10 * 3; + @Config(key = "ec.reconstruct.stripe.write.pool.limit", + defaultValue = "30", + description = "Thread pool max size for parallelly write" + + " available ec chunks to reconstruct the whole stripe.", + tags = ConfigTag.CLIENT) + private int ecReconstructStripeWritePoolLimit = 10 * 3; + @Config(key = "checksum.combine.mode", defaultValue = "COMPOSITE_CRC", description = "The combined checksum type [MD5MD5CRC / COMPOSITE_CRC] " @@ -387,6 +394,14 @@ public int getEcReconstructStripeReadPoolLimit() { return ecReconstructStripeReadPoolLimit; } + public void setEcReconstructStripeWritePoolLimit(int poolLimit) { + this.ecReconstructStripeWritePoolLimit = poolLimit; + } + + public int getEcReconstructStripeWritePoolLimit() { + return ecReconstructStripeWritePoolLimit; + } + public void setFsDefaultBucketLayout(String bucketLayout) { if (!bucketLayout.isEmpty()) { this.fsDefaultBucketLayout = bucketLayout; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java index 0c5501c7922c..957f761ccbc2 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java @@ -73,7 +73,7 @@ SortedMap> getCommitIndexMap() { return commitIndexMap; } - void updateCommitInfoMap(long index, List buffers) { + synchronized void updateCommitInfoMap(long index, List buffers) { commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>()) .addAll(buffers); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index b97165084f6e..94d0a88b358f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -25,9 +25,9 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -144,7 +144,8 @@ public BlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, + Supplier blockOutputStreamResourceProvider ) throws IOException { this.xceiverClientFactory = xceiverClientManager; this.config = config; @@ -177,8 +178,7 @@ public BlockOutputStream( (long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs .getStreamBufferFlushSize()); - // A single thread executor handle the responses of async requests - responseExecutor = Executors.newSingleThreadExecutor(); + this.responseExecutor = blockOutputStreamResourceProvider.get(); bufferList = null; totalDataFlushedLength = 0; writtenDataLength = 0; @@ -660,7 +660,6 @@ public void cleanup(boolean invalidateClient) { bufferList.clear(); } bufferList = null; - responseExecutor.shutdown(); } /** diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java index 3c7f8a2360c8..aa339409eceb 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java @@ -24,6 +24,7 @@ */ package org.apache.hadoop.hdds.scm.storage; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.ozone.common.ChunkBuffer; @@ -32,6 +33,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; /** * This class executes watchForCommit on ratis pipeline and releases @@ -42,8 +44,8 @@ class CommitWatcher extends AbstractCommitWatcher { private final BufferPool bufferPool; // future Map to hold up all putBlock futures - private final ConcurrentMap> futureMap = new ConcurrentHashMap<>(); + private final ConcurrentMap> + futureMap = new ConcurrentHashMap<>(); CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) { super(xceiverClient); @@ -67,11 +69,24 @@ void releaseBuffers(long index) { + totalLength + ": existing = " + futureMap.keySet()); } - ConcurrentMap> getFutureMap() { + @VisibleForTesting + ConcurrentMap> getFutureMap() { return futureMap; } + public void putFlushFuture(long flushPos, CompletableFuture flushFuture) { + futureMap.compute(flushPos, + (key, previous) -> previous == null ? flushFuture : + previous.thenCombine(flushFuture, (prev, curr) -> curr)); + } + + + public void waitOnFlushFutures() throws InterruptedException, ExecutionException { + // wait for all the transactions to complete + CompletableFuture.allOf(futureMap.values().toArray( + new CompletableFuture[0])).get(); + } + @Override public void cleanup() { super.cleanup(); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java index 1d7fdc1df60b..ebcc37736ad0 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java @@ -44,6 +44,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync; @@ -75,10 +77,11 @@ public ECBlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, + Supplier executorServiceSupplier ) throws IOException { super(blockID, xceiverClientManager, - pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs); + pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier); // In EC stream, there will be only one node in pipeline. this.datanodeDetails = pipeline.getClosestNode(); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java index ee708bf0de15..4a46efc844cd 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java @@ -39,6 +39,8 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; /** * An {@link OutputStream} used by the REST service in combination with the @@ -69,8 +71,8 @@ public class RatisBlockOutputStream extends BlockOutputStream /** * Creates a new BlockOutputStream. * - * @param blockID block ID - * @param bufferPool pool of buffers + * @param blockID block ID + * @param bufferPool pool of buffers */ @SuppressWarnings("checkstyle:ParameterNumber") public RatisBlockOutputStream( @@ -80,10 +82,11 @@ public RatisBlockOutputStream( BufferPool bufferPool, OzoneClientConfig config, Token token, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs + ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs, + Supplier blockOutputStreamResourceProvider ) throws IOException { super(blockID, xceiverClientManager, pipeline, - bufferPool, config, token, clientMetrics, streamBufferArgs); + bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider); this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient()); } @@ -114,16 +117,13 @@ void updateCommitInfo(XceiverClientReply reply, List buffers) { } @Override - void putFlushFuture(long flushPos, - CompletableFuture flushFuture) { - commitWatcher.getFutureMap().put(flushPos, flushFuture); + void putFlushFuture(long flushPos, CompletableFuture flushFuture) { + commitWatcher.putFlushFuture(flushPos, flushFuture); } @Override void waitOnFlushFutures() throws InterruptedException, ExecutionException { - // wait for all the transactions to complete - CompletableFuture.allOf(commitWatcher.getFutureMap().values().toArray( - new CompletableFuture[0])).get(); + commitWatcher.waitOnFlushFutures(); } @Override diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index 29c0798df77d..f52359ef6350 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -47,6 +47,7 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -114,7 +115,9 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) bufferPool, config, null, - ContainerClientMetrics.acquire(), streamBufferArgs); + ContainerClientMetrics.acquire(), + streamBufferArgs, + () -> newFixedThreadPool(10)); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index bec67347684e..a243b0c7dab1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -135,12 +135,22 @@ public void handle(SCMCommand command, OzoneContainer container, SCMCommandProto.Type.deleteBlocksCommand, command.getType()); return; } - + DeleteCmdInfo cmd = new DeleteCmdInfo((DeleteBlocksCommand) command, + container, context, connectionManager); try { - DeleteCmdInfo cmd = new DeleteCmdInfo((DeleteBlocksCommand) command, - container, context, connectionManager); deleteCommandQueues.add(cmd); } catch (IllegalStateException e) { + String dnId = context.getParent().getDatanodeDetails().getUuidString(); + Consumer updateFailure = (cmdStatus) -> { + cmdStatus.markAsFailed(); + ContainerBlocksDeletionACKProto emptyACK = + ContainerBlocksDeletionACKProto + .newBuilder() + .setDnId(dnId) + .build(); + ((DeleteBlockCommandStatus)cmdStatus).setBlocksDeletionAck(emptyACK); + }; + updateCommandStatus(cmd.getContext(), cmd.getCmd(), updateFailure, LOG); LOG.warn("Command is discarded because of the command queue is full"); } } @@ -382,9 +392,13 @@ private void processCmd(DeleteCmdInfo cmd) { } finally { final ContainerBlocksDeletionACKProto deleteAck = blockDeletionACK; - final boolean status = cmdExecuted; + final boolean executedStatus = cmdExecuted; Consumer statusUpdater = (cmdStatus) -> { - cmdStatus.setStatus(status); + if (executedStatus) { + cmdStatus.markAsExecuted(); + } else { + cmdStatus.markAsFailed(); + } ((DeleteBlockCommandStatus)cmdStatus).setBlocksDeletionAck(deleteAck); }; updateCommandStatus(cmd.getContext(), cmd.getCmd(), statusUpdater, LOG); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 234439a00c24..90756bbc8898 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -35,8 +35,8 @@ import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; -import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient; import org.apache.hadoop.hdds.security.SecurityConfig; +import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient; import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.utils.IOUtils; @@ -50,6 +50,7 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.security.token.Token; +import org.apache.ratis.util.MemoizedSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +71,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -101,12 +101,14 @@ public class ECReconstructionCoordinator implements Closeable { private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3; + private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5; + private final ECContainerOperationClient containerOperationClient; private final ByteBufferPool byteBufferPool; - private final ExecutorService ecReconstructExecutor; - + private final ExecutorService ecReconstructReadExecutor; + private final MemoizedSupplier ecReconstructWriteExecutor; private final BlockInputStreamFactory blockInputStreamFactory; private final TokenHelper tokenHelper; private final ContainerClientMetrics clientMetrics; @@ -123,20 +125,18 @@ public ECReconstructionCoordinator( this.containerOperationClient = new ECContainerOperationClient(conf, certificateClient); this.byteBufferPool = new ElasticByteBufferPool(); - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat(threadNamePrefix + "ec-reconstruct-reader-TID-%d") - .build(); ozoneClientConfig = conf.getObject(OzoneClientConfig.class); - this.ecReconstructExecutor = - new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE, - ozoneClientConfig.getEcReconstructStripeReadPoolLimit(), - 60, - TimeUnit.SECONDS, - new SynchronousQueue<>(), - threadFactory, - new ThreadPoolExecutor.CallerRunsPolicy()); + this.ecReconstructReadExecutor = createThreadPoolExecutor( + EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE, + ozoneClientConfig.getEcReconstructStripeReadPoolLimit(), + threadNamePrefix + "ec-reconstruct-reader-TID-%d"); + this.ecReconstructWriteExecutor = MemoizedSupplier.valueOf( + () -> createThreadPoolExecutor( + EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE, + ozoneClientConfig.getEcReconstructStripeWritePoolLimit(), + threadNamePrefix + "ec-reconstruct-writer-TID-%d")); this.blockInputStreamFactory = BlockInputStreamFactoryImpl - .getInstance(byteBufferPool, () -> ecReconstructExecutor); + .getInstance(byteBufferPool, () -> ecReconstructReadExecutor); tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient); this.clientMetrics = ContainerClientMetrics.acquire(); this.metrics = metrics; @@ -232,7 +232,7 @@ private ECBlockOutputStream getECBlockOutputStream( containerOperationClient.singleNodePipeline(datanodeDetails, repConfig, replicaIndex), BufferPool.empty(), ozoneClientConfig, - blockLocationInfo.getToken(), clientMetrics, streamBufferArgs); + blockLocationInfo.getToken(), clientMetrics, streamBufferArgs, ecReconstructWriteExecutor); } @VisibleForTesting @@ -272,7 +272,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, repConfig, blockLocationInfo, true, this.containerOperationClient.getXceiverClientManager(), null, this.blockInputStreamFactory, byteBufferPool, - this.ecReconstructExecutor)) { + this.ecReconstructReadExecutor)) { ECBlockOutputStream[] targetBlockStreams = new ECBlockOutputStream[toReconstructIndexes.size()]; @@ -457,6 +457,9 @@ public void close() throws IOException { if (containerOperationClient != null) { containerOperationClient.close(); } + if (ecReconstructWriteExecutor.isInitialized()) { + ecReconstructWriteExecutor.get().shutdownNow(); + } } private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig, @@ -590,4 +593,12 @@ OptionalLong getTermOfLeaderSCM() { .map(StateContext::getTermOfLeaderSCM) .orElse(OptionalLong.empty()); } + + private static ExecutorService createThreadPoolExecutor( + int corePoolSize, int maximumPoolSize, String threadNameFormat) { + return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, + 60, TimeUnit.SECONDS, new SynchronousQueue<>(), + new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(), + new ThreadPoolExecutor.CallerRunsPolicy()); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java index 4b3ce840dceb..08df150bd8b7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java @@ -63,12 +63,22 @@ public String getMsg() { * * @param status */ - public void setStatus(Status status) { + private void setStatus(Status status) { this.status = status; } - public void setStatus(boolean cmdExecuted) { - setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED); + /** + * Marks the command status as executed. + */ + public void markAsExecuted() { + setStatus(Status.EXECUTED); + } + + /** + * Marks the command status as failed. + */ + public void markAsFailed() { + setStatus(Status.FAILED); } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java index 083a8eb0169f..35f75b842e94 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteBlocksCommandHandler.java @@ -16,25 +16,36 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.ContainerTestUtils; import org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler.SchemaHandler; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.rules.TestRule; import org.junit.rules.Timeout; import org.apache.ozone.test.JUnit5AwareTimeout; @@ -58,6 +69,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import static java.util.Collections.emptyList; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.BLOCK_DELETE_COMMAND_WORKER_INTERVAL; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.BLOCK_DELETE_COMMAND_WORKER_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler.DeleteBlockTransactionExecutionResult; @@ -76,6 +88,8 @@ */ @RunWith(Parameterized.class) public class TestDeleteBlocksCommandHandler { + @Rule + public TemporaryFolder folder = new TemporaryFolder(); @Rule public TestRule testTimeout = new JUnit5AwareTimeout(Timeout.seconds(300)); @@ -286,6 +300,43 @@ public void testDeleteCmdWorkerInterval() { Assert.assertEquals(deleteCmdWorker.getInterval(), 4000); } + @Test + public void testDeleteBlockCommandHandleWhenDeleteCommandQueuesFull() + throws IOException { + int blockDeleteQueueLimit = 5; + // Setting up the test environment + OzoneConfiguration configuration = new OzoneConfiguration(); + configuration.set(HddsConfigKeys.OZONE_METADATA_DIRS, folder.toString()); + DatanodeDetails datanodeDetails = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeConfiguration dnConf = + configuration.getObject(DatanodeConfiguration.class); + OzoneContainer container = ContainerTestUtils.getOzoneContainer(datanodeDetails, configuration); + DatanodeStateMachine stateMachine = Mockito.mock(DatanodeStateMachine.class); + Mockito.when(stateMachine.getDatanodeDetails()).thenReturn(datanodeDetails); + StateContext context = new StateContext(configuration, + Mockito.mock(DatanodeStateMachine.DatanodeStates.class), + stateMachine, ""); + + // Set Queue limit + dnConf.setBlockDeleteQueueLimit(blockDeleteQueueLimit); + handler = new DeleteBlocksCommandHandler( + container, configuration, dnConf, ""); + + // Check if the command status is as expected: PENDING when queue is not full, FAILED when queue is full + for (int i = 0; i < blockDeleteQueueLimit + 2; i++) { + DeleteBlocksCommand deleteBlocksCommand = new DeleteBlocksCommand(emptyList()); + context.addCommand(deleteBlocksCommand); + handler.handle(deleteBlocksCommand, container, context, Mockito.mock(SCMConnectionManager.class)); + CommandStatus cmdStatus = context.getCmdStatus(deleteBlocksCommand.getId()); + if (i < blockDeleteQueueLimit) { + Assert.assertEquals(cmdStatus.getStatus(), Status.PENDING); + } else { + Assert.assertEquals(cmdStatus.getStatus(), Status.FAILED); + Assert.assertEquals(cmdStatus.getProtoBufMessage().getBlockDeletionAck().getResultsCount(), 0); + } + } + } + private DeletedBlocksTransaction createDeletedBlocksTransaction(long txID, long containerID) { return DeletedBlocksTransaction.newBuilder() diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 9bdec27f534f..ba3850ff3947 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -21,6 +21,8 @@ import java.io.OutputStream; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.hdds.client.BlockID; @@ -37,6 +39,7 @@ import org.apache.hadoop.security.token.Token; import com.google.common.annotations.VisibleForTesting; +import org.apache.ratis.util.JavaUtils; /** * A BlockOutputStreamEntry manages the data writes into the DataNodes. @@ -60,33 +63,30 @@ public class BlockOutputStreamEntry extends OutputStream { private long currentPosition; private final Token token; - private BufferPool bufferPool; - private ContainerClientMetrics clientMetrics; - private StreamBufferArgs streamBufferArgs; - - @SuppressWarnings({"parameternumber", "squid:S00107"}) - BlockOutputStreamEntry( - BlockID blockID, String key, - XceiverClientFactory xceiverClientManager, - Pipeline pipeline, - long length, - BufferPool bufferPool, - Token token, - OzoneClientConfig config, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs - ) { - this.config = config; + private final BufferPool bufferPool; + private final ContainerClientMetrics clientMetrics; + private final StreamBufferArgs streamBufferArgs; + private final Supplier executorServiceSupplier; + + BlockOutputStreamEntry(Builder b) { + this.config = b.config; this.outputStream = null; - this.blockID = blockID; - this.key = key; - this.xceiverClientManager = xceiverClientManager; - this.pipeline = pipeline; - this.token = token; - this.length = length; + this.blockID = b.blockID; + this.key = b.key; + this.xceiverClientManager = b.xceiverClientManager; + this.pipeline = b.pipeline; + this.token = b.token; + this.length = b.length; this.currentPosition = 0; - this.bufferPool = bufferPool; - this.clientMetrics = clientMetrics; - this.streamBufferArgs = streamBufferArgs; + this.bufferPool = b.bufferPool; + this.clientMetrics = b.clientMetrics; + this.streamBufferArgs = b.streamBufferArgs; + this.executorServiceSupplier = b.executorServiceSupplier; + } + + @Override + public String toString() { + return JavaUtils.getClassSimpleName(getClass()) + ":" + key + " " + blockID; } /** @@ -108,13 +108,18 @@ void checkStream() throws IOException { */ void createOutputStream() throws IOException { outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager, - pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs); + pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, + executorServiceSupplier); } ContainerClientMetrics getClientMetrics() { return clientMetrics; } + Supplier getExecutorServiceSupplier() { + return executorServiceSupplier; + } + StreamBufferArgs getStreamBufferArgs() { return streamBufferArgs; } @@ -361,6 +366,15 @@ public static class Builder { private OzoneClientConfig config; private ContainerClientMetrics clientMetrics; private StreamBufferArgs streamBufferArgs; + private Supplier executorServiceSupplier; + + public Pipeline getPipeline() { + return pipeline; + } + + public long getLength() { + return length; + } public Builder setBlockID(BlockID bID) { this.blockID = bID; @@ -402,23 +416,24 @@ public Builder setToken(Token bToken) { this.token = bToken; return this; } + public Builder setClientMetrics(ContainerClientMetrics clientMetrics) { this.clientMetrics = clientMetrics; return this; } + public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) { this.streamBufferArgs = streamBufferArgs; return this; } + public Builder setExecutorServiceSupplier(Supplier executorServiceSupplier) { + this.executorServiceSupplier = executorServiceSupplier; + return this; + } + public BlockOutputStreamEntry build() { - return new BlockOutputStreamEntry(blockID, - key, - xceiverClientManager, - pipeline, - length, - bufferPool, - token, config, clientMetrics, streamBufferArgs); + return new BlockOutputStreamEntry(this); } } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index d0f3b5728a8b..51383e8717ab 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -25,8 +25,9 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; -import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; @@ -62,7 +63,7 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware { /** * List of stream entries that are used to write a block of data. */ - private final List streamEntries; + private final List streamEntries = new ArrayList<>(); private final OzoneClientConfig config; /** * The actual stream entry we are writing into. Note that a stream entry is @@ -73,7 +74,6 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware { private final OzoneManagerProtocol omClient; private final OmKeyArgs keyArgs; private final XceiverClientFactory xceiverClientFactory; - private final String requestID; /** * A {@link BufferPool} shared between all * {@link org.apache.hadoop.hdds.scm.storage.BlockOutputStream}s managed by @@ -85,40 +85,34 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware { private final ExcludeList excludeList; private final ContainerClientMetrics clientMetrics; private final StreamBufferArgs streamBufferArgs; + private final Supplier executorServiceSupplier; - @SuppressWarnings({"parameternumber", "squid:S00107"}) - public BlockOutputStreamEntryPool( - OzoneClientConfig config, - OzoneManagerProtocol omClient, - String requestId, ReplicationConfig replicationConfig, - String uploadID, int partNumber, - boolean isMultipart, OmKeyInfo info, - boolean unsafeByteBufferConversion, - XceiverClientFactory xceiverClientFactory, long openID, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs - ) { - this.config = config; - this.xceiverClientFactory = xceiverClientFactory; - streamEntries = new ArrayList<>(); + public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) { + this.config = b.getClientConfig(); + this.xceiverClientFactory = b.getXceiverManager(); currentStreamIndex = 0; - this.omClient = omClient; + this.omClient = b.getOmClient(); + final OmKeyInfo info = b.getOpenHandler().getKeyInfo(); this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName()) .setBucketName(info.getBucketName()).setKeyName(info.getKeyName()) - .setReplicationConfig(replicationConfig).setDataSize(info.getDataSize()) - .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID) - .setMultipartUploadPartNumber(partNumber).build(); - this.requestID = requestId; - this.openID = openID; + .setReplicationConfig(b.getReplicationConfig()) + .setDataSize(info.getDataSize()) + .setIsMultipartKey(b.isMultipartKey()) + .setMultipartUploadID(b.getMultipartUploadID()) + .setMultipartUploadPartNumber(b.getMultipartNumber()) + .build(); + this.openID = b.getOpenHandler().getId(); this.excludeList = createExcludeList(); + this.streamBufferArgs = b.getStreamBufferArgs(); this.bufferPool = new BufferPool(streamBufferArgs.getStreamBufferSize(), (int) (streamBufferArgs.getStreamBufferMaxSize() / streamBufferArgs .getStreamBufferSize()), ByteStringConversion - .createByteBufferConversion(unsafeByteBufferConversion)); - this.clientMetrics = clientMetrics; - this.streamBufferArgs = streamBufferArgs; + .createByteBufferConversion(b.isUnsafeByteBufferConversionEnabled())); + this.clientMetrics = b.getClientMetrics(); + this.executorServiceSupplier = b.getExecutorServiceSupplier(); } ExcludeList createExcludeList() { @@ -126,25 +120,6 @@ ExcludeList createExcludeList() { Clock.system(ZoneOffset.UTC)); } - BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics, - OzoneClientConfig clientConfig, StreamBufferArgs streamBufferArgs) { - streamEntries = new ArrayList<>(); - omClient = null; - keyArgs = null; - xceiverClientFactory = null; - config = clientConfig; - streamBufferArgs.setStreamBufferFlushDelay(false); - requestID = null; - int chunkSize = 0; - bufferPool = new BufferPool(chunkSize, 1); - - currentStreamIndex = 0; - openID = -1; - excludeList = createExcludeList(); - this.clientMetrics = clientMetrics; - this.streamBufferArgs = null; - } - /** * When a key is opened, it is possible that there are some blocks already * allocated to it for this open session. In this case, to make use of these @@ -156,10 +131,8 @@ ExcludeList createExcludeList() { * * @param version the set of blocks that are pre-allocated. * @param openVersion the version corresponding to the pre-allocation. - * @throws IOException */ - public void addPreallocateBlocks(OmKeyLocationInfoGroup version, - long openVersion) throws IOException { + public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long openVersion) { // server may return any number of blocks, (0 to any) // only the blocks allocated in this open session (block createVersion // equals to open session version) @@ -190,6 +163,7 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { .setToken(subKeyInfo.getToken()) .setClientMetrics(clientMetrics) .setStreamBufferArgs(streamBufferArgs) + .setExecutorServiceSupplier(executorServiceSupplier) .build(); } @@ -260,6 +234,10 @@ StreamBufferArgs getStreamBufferArgs() { return streamBufferArgs; } + public Supplier getExecutorServiceSupplier() { + return executorServiceSupplier; + } + /** * Discards the subsequent pre allocated blocks and removes the streamEntries * from the streamEntries list for the container which is closed. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java index 07d0f46069ca..241754a57f19 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java @@ -23,17 +23,10 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.ContainerClientMetrics; -import org.apache.hadoop.hdds.scm.OzoneClientConfig; -import org.apache.hadoop.hdds.scm.StreamBufferArgs; -import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; -import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; -import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.hdds.utils.IOUtils; -import org.apache.hadoop.security.token.Token; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,19 +68,10 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry { private int currentStreamIdx = 0; private long successfulBlkGrpAckedLen; - @SuppressWarnings({"parameternumber", "squid:S00107"}) - ECBlockOutputStreamEntry(BlockID blockID, String key, - XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length, - BufferPool bufferPool, Token token, - OzoneClientConfig config, ContainerClientMetrics clientMetrics, - StreamBufferArgs streamBufferArgs) { - super(blockID, key, xceiverClientManager, pipeline, length, bufferPool, - token, config, clientMetrics, streamBufferArgs); - assertInstanceOf( - pipeline.getReplicationConfig(), ECReplicationConfig.class); - this.replicationConfig = - (ECReplicationConfig) pipeline.getReplicationConfig(); - this.length = replicationConfig.getData() * length; + ECBlockOutputStreamEntry(Builder b) { + super(b); + this.replicationConfig = assertInstanceOf(b.getPipeline().getReplicationConfig(), ECReplicationConfig.class); + this.length = replicationConfig.getData() * b.getLength(); } @Override @@ -101,7 +85,8 @@ void checkStream() throws IOException { streams[i] = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(), createSingleECBlockPipeline(getPipeline(), nodes.get(i), i + 1), - getBufferPool(), getConf(), getToken(), getClientMetrics(), getStreamBufferArgs()); + getBufferPool(), getConf(), getToken(), getClientMetrics(), getStreamBufferArgs(), + getExecutorServiceSupplier()); } blockOutputStreams = streams; } @@ -433,82 +418,9 @@ public ByteString calculateChecksum() throws IOException { /** * Builder class for ChunkGroupOutputStreamEntry. * */ - public static class Builder { - private BlockID blockID; - private String key; - private XceiverClientFactory xceiverClientManager; - private Pipeline pipeline; - private long length; - private BufferPool bufferPool; - private Token token; - private OzoneClientConfig config; - private ContainerClientMetrics clientMetrics; - private StreamBufferArgs streamBufferArgs; - - public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) { - this.blockID = bID; - return this; - } - - public ECBlockOutputStreamEntry.Builder setKey(String keys) { - this.key = keys; - return this; - } - - public ECBlockOutputStreamEntry.Builder setXceiverClientManager( - XceiverClientFactory - xClientManager) { - this.xceiverClientManager = xClientManager; - return this; - } - - public ECBlockOutputStreamEntry.Builder setPipeline(Pipeline ppln) { - this.pipeline = ppln; - return this; - } - - public ECBlockOutputStreamEntry.Builder setLength(long len) { - this.length = len; - return this; - } - - public ECBlockOutputStreamEntry.Builder setBufferPool(BufferPool pool) { - this.bufferPool = pool; - return this; - } - - public ECBlockOutputStreamEntry.Builder setConfig( - OzoneClientConfig clientConfig) { - this.config = clientConfig; - return this; - } - - public ECBlockOutputStreamEntry.Builder setToken( - Token bToken) { - this.token = bToken; - return this; - } - - public ECBlockOutputStreamEntry.Builder setClientMetrics( - ContainerClientMetrics containerClientMetrics) { - this.clientMetrics = containerClientMetrics; - return this; - } - - public ECBlockOutputStreamEntry.Builder setStreamBufferArgs( - StreamBufferArgs args) { - this.streamBufferArgs = args; - return this; - } - + public static class Builder extends BlockOutputStreamEntry.Builder { public ECBlockOutputStreamEntry build() { - return new ECBlockOutputStreamEntry(blockID, - key, - xceiverClientManager, - pipeline, - length, - bufferPool, - token, config, clientMetrics, streamBufferArgs); + return new ECBlockOutputStreamEntry(this); } } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java index e551605d842d..6eb9aed0d3ad 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java @@ -17,19 +17,7 @@ */ package org.apache.hadoop.ozone.client.io; -import org.apache.hadoop.hdds.client.ECReplicationConfig; -import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.scm.ContainerClientMetrics; -import org.apache.hadoop.hdds.scm.OzoneClientConfig; -import org.apache.hadoop.hdds.scm.StreamBufferArgs; -import org.apache.hadoop.hdds.scm.XceiverClientFactory; -import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; - -import java.time.Clock; -import java.time.ZoneOffset; /** * {@link BlockOutputStreamEntryPool} is responsible to manage OM communication @@ -44,37 +32,14 @@ * @see ECBlockOutputStreamEntry */ public class ECBlockOutputStreamEntryPool extends BlockOutputStreamEntryPool { - - @SuppressWarnings({"parameternumber", "squid:S00107"}) - public ECBlockOutputStreamEntryPool(OzoneClientConfig config, - OzoneManagerProtocol omClient, - String requestId, - ReplicationConfig replicationConfig, - String uploadID, - int partNumber, - boolean isMultipart, - OmKeyInfo info, - boolean unsafeByteBufferConversion, - XceiverClientFactory xceiverClientFactory, - long openID, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs) { - super(config, omClient, requestId, replicationConfig, uploadID, partNumber, - isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory, - openID, clientMetrics, streamBufferArgs); - assert replicationConfig instanceof ECReplicationConfig; - } - - @Override - ExcludeList createExcludeList() { - return new ExcludeList(getConfig().getExcludeNodesExpiryTime(), - Clock.system(ZoneOffset.UTC)); + public ECBlockOutputStreamEntryPool(ECKeyOutputStream.Builder builder) { + super(builder); } @Override - BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { - return - new ECBlockOutputStreamEntry.Builder() - .setBlockID(subKeyInfo.getBlockID()) + ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { + final ECBlockOutputStreamEntry.Builder b = new ECBlockOutputStreamEntry.Builder(); + b.setBlockID(subKeyInfo.getBlockID()) .setKey(getKeyName()) .setXceiverClientManager(getXceiverClientFactory()) .setPipeline(subKeyInfo.getPipeline()) @@ -84,7 +49,8 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { .setToken(subKeyInfo.getToken()) .setClientMetrics(getClientMetrics()) .setStreamBufferArgs(getStreamBufferArgs()) - .build(); + .setExecutorServiceSupplier(getExecutorServiceSupplier()); + return b.build(); } @Override diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index b5c36474ff9e..0cb3973e0411 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -17,41 +17,16 @@ */ package org.apache.hadoop.ozone.client.io; -import java.io.IOException; -import java.nio.Buffer; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.scm.OzoneClientConfig; -import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; -import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; import org.apache.hadoop.io.ByteBufferPool; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; -import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import org.apache.hadoop.ozone.om.protocol.S3Auth; import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder; import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil; @@ -59,6 +34,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + /** * ECKeyOutputStream handles the EC writes by writing the data into underlying * block output streams chunk by chunk. @@ -74,7 +64,6 @@ public final class ECKeyOutputStream extends KeyOutputStream private final int numParityBlks; private final ByteBufferPool bufferPool; private final RawErasureEncoder encoder; - private final ExecutorService flushExecutor; private final Future flushFuture; private final AtomicLong flushCheckpoint; @@ -100,22 +89,6 @@ private enum StripeWriteStatus { private long offset; // how much data has been ingested into the stream private long writeOffset; - private final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool; - - @VisibleForTesting - public List getStreamEntries() { - return blockOutputStreamEntryPool.getStreamEntries(); - } - - @VisibleForTesting - public XceiverClientFactory getXceiverClientFactory() { - return blockOutputStreamEntryPool.getXceiverClientFactory(); - } - - @VisibleForTesting - public List getLocationInfoList() { - return blockOutputStreamEntryPool.getLocationInfoList(); - } @VisibleForTesting public void insertFlushCheckpoint(long version) throws IOException { @@ -128,8 +101,7 @@ public long getFlushCheckpoint() { } private ECKeyOutputStream(Builder builder) { - super(builder.getReplicationConfig(), builder.getClientMetrics(), - builder.getClientConfig(), builder.getStreamBufferArgs()); + super(builder.getReplicationConfig(), new ECBlockOutputStreamEntryPool(builder)); this.config = builder.getClientConfig(); this.bufferPool = builder.getByteBufferPool(); // For EC, cell/chunk size and buffer size can be same for now. @@ -140,46 +112,24 @@ private ECKeyOutputStream(Builder builder) { ecChunkSize, numDataBlks, numParityBlks, bufferPool); chunkIndex = 0; ecStripeQueue = new ArrayBlockingQueue<>(config.getEcStripeQueueSize()); - OmKeyInfo info = builder.getOpenHandler().getKeyInfo(); - blockOutputStreamEntryPool = - new ECBlockOutputStreamEntryPool(config, - builder.getOmClient(), builder.getRequestID(), - builder.getReplicationConfig(), - builder.getMultipartUploadID(), builder.getMultipartNumber(), - builder.isMultipartKey(), - info, builder.isUnsafeByteBufferConversionEnabled(), - builder.getXceiverManager(), builder.getOpenHandler().getId(), - builder.getClientMetrics(), builder.getStreamBufferArgs()); this.writeOffset = 0; this.encoder = CodecUtil.createRawEncoderWithFallback( builder.getReplicationConfig()); - this.flushExecutor = Executors.newSingleThreadExecutor(); S3Auth s3Auth = builder.getS3CredentialsProvider().get(); ThreadLocal s3CredentialsProvider = builder.getS3CredentialsProvider(); - flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth)); - this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue); + this.flushFuture = builder.getExecutorServiceSupplier().get().submit(() -> { + s3CredentialsProvider.set(s3Auth); + return flushStripeFromQueue(); + }); this.flushCheckpoint = new AtomicLong(0); this.atomicKeyCreation = builder.getAtomicKeyCreation(); } - /** - * When a key is opened, it is possible that there are some blocks already - * allocated to it for this open session. In this case, to make use of these - * blocks, we need to add these blocks to stream entries. But, a key's version - * also includes blocks from previous versions, we need to avoid adding these - * old blocks to stream entries, because these old blocks should not be picked - * for write. To do this, the following method checks that, only those - * blocks created in this particular open version are added to stream entries. - * - * @param version the set of blocks that are pre-allocated. - * @param openVersion the version corresponding to the pre-allocation. - * @throws IOException - */ - public void addPreallocateBlocks(OmKeyLocationInfoGroup version, - long openVersion) throws IOException { - blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion); + @Override + protected ECBlockOutputStreamEntryPool getBlockOutputStreamEntryPool() { + return (ECBlockOutputStreamEntryPool) super.getBlockOutputStreamEntryPool(); } /** @@ -218,6 +168,7 @@ private void rollbackAndReset(ECChunkBuffers stripe) throws IOException { final ByteBuffer[] dataBuffers = stripe.getDataBuffers(); offset -= Arrays.stream(dataBuffers).mapToInt(Buffer::limit).sum(); + final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = getBlockOutputStreamEntryPool(); final ECBlockOutputStreamEntry failedStreamEntry = blockOutputStreamEntryPool.getCurrentStreamEntry(); failedStreamEntry.resetToFirstEntry(); @@ -256,8 +207,7 @@ private void logStreamError(List failedStreams, private StripeWriteStatus commitStripeWrite(ECChunkBuffers stripe) throws IOException { - ECBlockOutputStreamEntry streamEntry = - blockOutputStreamEntryPool.getCurrentStreamEntry(); + final ECBlockOutputStreamEntry streamEntry = getBlockOutputStreamEntryPool().getCurrentStreamEntry(); List failedStreams = streamEntry.streamsWithWriteFailure(); if (!failedStreams.isEmpty()) { @@ -297,6 +247,7 @@ private void excludePipelineAndFailedDN(Pipeline pipeline, List failedStreams) { // Exclude the failed pipeline + final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = getBlockOutputStreamEntryPool(); blockOutputStreamEntryPool.getExcludeList().addPipeline(pipeline.getId()); // If the failure is NOT caused by other reasons (e.g. container full), @@ -362,6 +313,7 @@ private void generateParityCells() throws IOException { } private void writeDataCells(ECChunkBuffers stripe) throws IOException { + final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = getBlockOutputStreamEntryPool(); blockOutputStreamEntryPool.allocateBlockIfNeeded(); ByteBuffer[] dataCells = stripe.getDataBuffers(); for (int i = 0; i < numDataBlks; i++) { @@ -374,6 +326,7 @@ private void writeDataCells(ECChunkBuffers stripe) throws IOException { private void writeParityCells(ECChunkBuffers stripe) { // Move the stream entry cursor to parity block index + final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = getBlockOutputStreamEntryPool(); blockOutputStreamEntryPool .getCurrentStreamEntry().forceToFirstParityBlock(); ByteBuffer[] parityCells = stripe.getParityBuffers(); @@ -413,7 +366,7 @@ private void handleOutputStreamWrite(ByteBuffer buffer, boolean isParity) { // The len cannot be bigger than cell buffer size. assert buffer.limit() <= ecChunkSize : "The buffer size: " + buffer.limit() + " should not exceed EC chunk size: " + ecChunkSize; - writeToOutputStream(blockOutputStreamEntryPool.getCurrentStreamEntry(), + writeToOutputStream(getBlockOutputStreamEntryPool().getCurrentStreamEntry(), buffer.array(), buffer.limit(), 0, isParity); } catch (Exception e) { markStreamAsFailed(e); @@ -449,8 +402,7 @@ private void handleException(BlockOutputStreamEntry streamEntry, Preconditions.checkNotNull(t); boolean containerExclusionException = checkIfContainerToExclude(t); if (containerExclusionException) { - blockOutputStreamEntryPool.getExcludeList() - .addPipeline(streamEntry.getPipeline().getId()); + getBlockOutputStreamEntryPool().getExcludeList().addPipeline(streamEntry.getPipeline().getId()); } markStreamAsFailed(exception); } @@ -460,7 +412,7 @@ private void markStreamClosed() { } private void markStreamAsFailed(Exception e) { - blockOutputStreamEntryPool.getCurrentStreamEntry().markFailed(e); + getBlockOutputStreamEntryPool().getCurrentStreamEntry().markFailed(e); } @Override @@ -470,6 +422,7 @@ public void flush() { private void closeCurrentStreamEntry() throws IOException { + final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = getBlockOutputStreamEntryPool(); if (!blockOutputStreamEntryPool.isEmpty()) { while (true) { try { @@ -503,6 +456,7 @@ public void close() throws IOException { return; } closed = true; + final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = getBlockOutputStreamEntryPool(); try { if (!closing) { // If stripe buffer is not empty, encode and flush the stripe. @@ -539,7 +493,6 @@ public void close() throws IOException { } catch (InterruptedException e) { throw new IOException("Flushing thread was interrupted", e); } finally { - flushExecutor.shutdownNow(); closeCurrentStreamEntry(); blockOutputStreamEntryPool.cleanup(); } @@ -614,20 +567,6 @@ public static void padBufferToLimit(ByteBuffer buf, int limit) { buf.position(limit); } - public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { - return blockOutputStreamEntryPool.getCommitUploadPartInfo(); - } - - @VisibleForTesting - public ExcludeList getExcludeList() { - return blockOutputStreamEntryPool.getExcludeList(); - } - - @Override - public Map getMetadata() { - return this.blockOutputStreamEntryPool.getMetadata(); - } - /** * Builder class of ECKeyOutputStream. */ @@ -682,9 +621,8 @@ public ECKeyOutputStream build() { */ private void checkNotClosed() throws IOException { if (closing || closed) { - throw new IOException( - ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " - + blockOutputStreamEntryPool.getKeyName()); + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + + getBlockOutputStreamEntryPool().getKeyName()); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 8b128e9cd945..d9e735cd7c8c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -24,7 +24,9 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.hadoop.fs.FSExceptionMessages; @@ -69,7 +71,6 @@ public class KeyOutputStream extends OutputStream implements Syncable, KeyMetadataAware { - private OzoneClientConfig config; private final ReplicationConfig replication; /** @@ -105,11 +106,8 @@ enum StreamAction { */ private boolean atomicKeyCreation; - public KeyOutputStream(ReplicationConfig replicationConfig, - ContainerClientMetrics clientMetrics, OzoneClientConfig clientConfig, - StreamBufferArgs streamBufferArgs) { + public KeyOutputStream(ReplicationConfig replicationConfig, BlockOutputStreamEntryPool blockOutputStreamEntryPool) { this.replication = replicationConfig; - this.config = clientConfig; closed = false; this.retryPolicyMap = HddsClientUtils.getExceptionList() .stream() @@ -117,18 +115,16 @@ public KeyOutputStream(ReplicationConfig replicationConfig, e -> RetryPolicies.TRY_ONCE_THEN_FAIL)); retryCount = 0; offset = 0; - blockOutputStreamEntryPool = - new BlockOutputStreamEntryPool(clientMetrics, clientConfig, streamBufferArgs); + this.blockOutputStreamEntryPool = blockOutputStreamEntryPool; } - @VisibleForTesting - public List getStreamEntries() { - return blockOutputStreamEntryPool.getStreamEntries(); + protected BlockOutputStreamEntryPool getBlockOutputStreamEntryPool() { + return blockOutputStreamEntryPool; } @VisibleForTesting - public XceiverClientFactory getXceiverClientFactory() { - return blockOutputStreamEntryPool.getXceiverClientFactory(); + public List getStreamEntries() { + return blockOutputStreamEntryPool.getStreamEntries(); } @VisibleForTesting @@ -146,39 +142,18 @@ public long getClientID() { return clientID; } - @SuppressWarnings({"parameternumber", "squid:S00107"}) - public KeyOutputStream( - OzoneClientConfig config, - OpenKeySession handler, - XceiverClientFactory xceiverClientManager, - OzoneManagerProtocol omClient, - String requestId, ReplicationConfig replicationConfig, - String uploadID, int partNumber, boolean isMultipart, - boolean unsafeByteBufferConversion, - ContainerClientMetrics clientMetrics, - boolean atomicKeyCreation, StreamBufferArgs streamBufferArgs - ) { - this.config = config; - this.replication = replicationConfig; - blockOutputStreamEntryPool = - new BlockOutputStreamEntryPool( - config, - omClient, - requestId, replicationConfig, - uploadID, partNumber, - isMultipart, handler.getKeyInfo(), - unsafeByteBufferConversion, - xceiverClientManager, - handler.getId(), - clientMetrics, streamBufferArgs); + public KeyOutputStream(Builder b) { + this.replication = b.replicationConfig; + this.blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(b); + final OzoneClientConfig config = b.getClientConfig(); this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException( config.getMaxRetryCount(), config.getRetryInterval()); this.retryCount = 0; this.isException = false; this.writeOffset = 0; - this.clientID = handler.getId(); - this.atomicKeyCreation = atomicKeyCreation; - this.streamBufferArgs = streamBufferArgs; + this.clientID = b.getOpenHandler().getId(); + this.atomicKeyCreation = b.getAtomicKeyCreation(); + this.streamBufferArgs = b.getStreamBufferArgs(); } /** @@ -192,10 +167,8 @@ public KeyOutputStream( * * @param version the set of blocks that are pre-allocated. * @param openVersion the version corresponding to the pre-allocation. - * @throws IOException */ - public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup version, - long openVersion) throws IOException { + public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup version, long openVersion) { blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion); } @@ -615,6 +588,7 @@ public static class Builder { private ContainerClientMetrics clientMetrics; private boolean atomicKeyCreation = false; private StreamBufferArgs streamBufferArgs; + private Supplier executorServiceSupplier; public String getMultipartUploadID() { return multipartUploadID; @@ -728,21 +702,17 @@ public boolean getAtomicKeyCreation() { return atomicKeyCreation; } + public Builder setExecutorServiceSupplier(Supplier executorServiceSupplier) { + this.executorServiceSupplier = executorServiceSupplier; + return this; + } + + public Supplier getExecutorServiceSupplier() { + return executorServiceSupplier; + } + public KeyOutputStream build() { - return new KeyOutputStream( - clientConfig, - openHandler, - xceiverManager, - omClient, - requestID, - replicationConfig, - multipartUploadID, - multipartNumber, - isMultipartKey, - unsafeByteBufferConversion, - clientMetrics, - atomicKeyCreation, - streamBufferArgs); + return new KeyOutputStream(this); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 0be238257526..953b8489a1b4 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -145,6 +145,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.util.MemoizedSupplier; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -196,6 +197,8 @@ public class RpcClient implements ClientProtocol { // for reconstruction. private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3; + private static final int WRITE_POOL_MIN_SIZE = 1; + private final ConfigurationSource conf; private final OzoneManagerClientProtocol ozoneManagerClient; private final XceiverClientFactory xceiverClientManager; @@ -214,8 +217,9 @@ public class RpcClient implements ClientProtocol { private final ByteBufferPool byteBufferPool; private final BlockInputStreamFactory blockInputStreamFactory; private final OzoneManagerVersion omVersion; - private volatile ExecutorService ecReconstructExecutor; + private final MemoizedSupplier ecReconstructExecutor; private final ContainerClientMetrics clientMetrics; + private final MemoizedSupplier writeExecutor; private final AtomicBoolean isS3GRequest = new AtomicBoolean(false); /** @@ -238,6 +242,11 @@ public RpcClient(ConfigurationSource conf, String omServiceId) this.groupRights = aclConfig.getGroupDefaultRights(); this.clientConfig = conf.getObject(OzoneClientConfig.class); + this.ecReconstructExecutor = MemoizedSupplier.valueOf(() -> createThreadPoolExecutor( + EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE, clientConfig.getEcReconstructStripeReadPoolLimit(), + "ec-reconstruct-reader-TID-%d")); + this.writeExecutor = MemoizedSupplier.valueOf(() -> createThreadPoolExecutor( + WRITE_POOL_MIN_SIZE, Integer.MAX_VALUE, "client-write-TID-%d")); OmTransport omTransport = createOmTransport(omServiceId); OzoneManagerProtocolClientSideTranslatorPB @@ -312,7 +321,7 @@ public void onRemoval( }).build(); this.byteBufferPool = new ElasticByteBufferPool(); this.blockInputStreamFactory = BlockInputStreamFactoryImpl - .getInstance(byteBufferPool, this::getECReconstructExecutor); + .getInstance(byteBufferPool, ecReconstructExecutor); this.clientMetrics = ContainerClientMetrics.acquire(); } @@ -1737,9 +1746,11 @@ private OmKeyInfo getKeyInfo(OmKeyArgs keyArgs) throws IOException { @Override public void close() throws IOException { - if (ecReconstructExecutor != null) { - ecReconstructExecutor.shutdownNow(); - ecReconstructExecutor = null; + if (ecReconstructExecutor.isInitialized()) { + ecReconstructExecutor.get().shutdownNow(); + } + if (writeExecutor.isInitialized()) { + writeExecutor.get().shutdownNow(); } IOUtils.cleanupWithLogger(LOG, ozoneManagerClient, xceiverClientManager); keyProviderCache.invalidateAll(); @@ -2359,6 +2370,7 @@ private KeyOutputStream.Builder createKeyOutputStream( .setConfig(clientConfig) .setAtomicKeyCreation(isS3GRequest.get()) .setClientMetrics(clientMetrics) + .setExecutorServiceSupplier(writeExecutor) .setStreamBufferArgs(streamBufferArgs); } @@ -2480,26 +2492,11 @@ public void setTimes(OzoneObj obj, String keyName, long mtime, long atime) ozoneManagerClient.setTimes(builder.build(), mtime, atime); } - public ExecutorService getECReconstructExecutor() { - // local ref to a volatile to ensure access - // to a completed initialized object - ExecutorService executor = ecReconstructExecutor; - if (executor == null) { - synchronized (this) { - executor = ecReconstructExecutor; - if (executor == null) { - ecReconstructExecutor = new ThreadPoolExecutor( - EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE, - clientConfig.getEcReconstructStripeReadPoolLimit(), - 60, TimeUnit.SECONDS, new SynchronousQueue<>(), - new ThreadFactoryBuilder() - .setNameFormat("ec-reconstruct-reader-TID-%d") - .build(), - new ThreadPoolExecutor.CallerRunsPolicy()); - executor = ecReconstructExecutor; - } - } - } - return executor; + private static ExecutorService createThreadPoolExecutor( + int corePoolSize, int maximumPoolSize, String threadNameFormat) { + return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, + 60, TimeUnit.SECONDS, new SynchronousQueue<>(), + new ThreadFactoryBuilder().setNameFormat(threadNameFormat).setDaemon(true).build(), + new ThreadPoolExecutor.CallerRunsPolicy()); } } diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java index 7760e88e484a..718e724e5854 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java @@ -63,10 +63,10 @@ public class TestECBlockOutputStreamEntry { try (XceiverClientManager manager = new XceiverClientManager(new OzoneConfiguration())) { HashSet clients = new HashSet<>(); - ECBlockOutputStreamEntry entry = new ECBlockOutputStreamEntry.Builder() - .setXceiverClientManager(manager) - .setPipeline(anECPipeline) - .build(); + final ECBlockOutputStreamEntry.Builder b = new ECBlockOutputStreamEntry.Builder(); + b.setXceiverClientManager(manager) + .setPipeline(anECPipeline); + final ECBlockOutputStreamEntry entry = b.build(); for (int i = 0; i < nodes.size(); i++) { clients.add( manager.acquireClient( @@ -101,10 +101,10 @@ public class TestECBlockOutputStreamEntry { try (XceiverClientManager manager = new XceiverClientManager(new OzoneConfiguration())) { HashSet clients = new HashSet<>(); - ECBlockOutputStreamEntry entry = new ECBlockOutputStreamEntry.Builder() - .setXceiverClientManager(manager) - .setPipeline(anECPipeline) - .build(); + final ECBlockOutputStreamEntry.Builder b = new ECBlockOutputStreamEntry.Builder(); + b.setXceiverClientManager(manager) + .setPipeline(anECPipeline); + final ECBlockOutputStreamEntry entry = b.build(); for (int i = 0; i < nodes.size(); i++) { clients.add( manager.acquireClient( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java index 3e648fd8165c..4f7ae6d410a2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java @@ -214,7 +214,7 @@ public void testReleaseBuffers() throws Exception { return v; }); futures.add(future); - watcher.getFutureMap().put(length, future); + watcher.putFlushFuture(length, future); replies.add(reply); } @@ -288,7 +288,7 @@ public void testReleaseBuffersOnException() throws Exception { return v; }); futures.add(future); - watcher.getFutureMap().put(length, future); + watcher.putFlushFuture(length, future); replies.add(reply); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java index b8b66c18f802..0d9aedeab04c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java @@ -187,6 +187,14 @@ static void shutdown() throws IOException { } } + static void reInitClient() throws IOException { + ozClient = OzoneClientFactory.getRpcClient(conf); + store = ozClient.getObjectStore(); + TestOzoneRpcClient.setOzClient(ozClient); + TestOzoneRpcClient.setStore(store); + } + + @ParameterizedTest @EnumSource void testPutKeyWithEncryption(BucketLayout bucketLayout) throws Exception { @@ -704,9 +712,7 @@ void testGetKeyProvider() throws Exception { KeyProvider kp3 = ozClient.getObjectStore().getKeyProvider(); assertNotEquals(kp3, kpSpy); - // Restore ozClient and store - TestOzoneRpcClient.setOzClient(OzoneClientFactory.getRpcClient(conf)); - TestOzoneRpcClient.setStore(ozClient.getObjectStore()); + reInitClient(); } private static RepeatedOmKeyInfo getMatchedKeyInfo( diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java index 983516002909..0687a0fb8e2a 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java @@ -22,8 +22,6 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.scm.OzoneClientConfig; -import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; @@ -79,10 +77,7 @@ public KeyOutputStream getKeyOutputStream() { OzoneConfiguration conf = new OzoneConfiguration(); ReplicationConfig replicationConfig = ReplicationConfig.getDefault(conf); - OzoneClientConfig ozoneClientConfig = conf.getObject(OzoneClientConfig.class); - StreamBufferArgs streamBufferArgs = - StreamBufferArgs.getDefaultStreamBufferArgs(replicationConfig, ozoneClientConfig); - return new KeyOutputStream(replicationConfig, null, ozoneClientConfig, streamBufferArgs) { + return new KeyOutputStream(replicationConfig, null) { @Override public synchronized OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {