diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java new file mode 100644 index 000000000000..2a79edbe31eb --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *      http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.hdds.conf.Config; +import org.apache.hadoop.hdds.conf.ConfigGroup; +import org.apache.hadoop.hdds.conf.ConfigTag; +import org.apache.hadoop.hdds.conf.ConfigType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.ozone.OzoneConfigKeys; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Configuration values for Ozone Client. + */ +@ConfigGroup(prefix = "ozone.client") +public class OzoneClientConfig { + + private static final Logger LOG = + LoggerFactory.getLogger(OzoneClientConfig.class); + + @Config(key = "stream.buffer.flush.size", + defaultValue = "16MB", + type = ConfigType.SIZE, + description = "Size which determines at what buffer position a partial " + + "flush will be initiated during write. It should be a multiple of" + + " ozone.client.stream.buffer.size", + tags = ConfigTag.CLIENT) + private long streamBufferFlushSize = 16 * 1024 * 1024; + + @Config(key = "stream.buffer.size", + defaultValue = "4MB", + type = ConfigType.SIZE, + description = "The size of chunks the client will send to the server", + tags = ConfigTag.CLIENT) + private int streamBufferSize = 4 * 1024 * 1024; + + @Config(key = "stream.buffer.flush.delay", + defaultValue = "true", + description = "Default true, when call flush() and determine whether " + + "the data in the current buffer is greater than ozone.client" + + ".stream.buffer.size, if greater than then send buffer to the " + + "datanode. You can turn this off by setting this configuration " + + "to false.", tags = ConfigTag.CLIENT) + private boolean streamBufferFlushDelay = true; + + @Config(key = "stream.buffer.max.size", + defaultValue = "32MB", + type = ConfigType.SIZE, + description = "Size which determines at what buffer position write call" + + " be blocked till acknowledgement of the first partial flush " + + "happens by all servers.", + tags = ConfigTag.CLIENT) + private long streamBufferMaxSize = 32 * 1024 * 1024; + + @Config(key = "max.retries", + defaultValue = "5", + description = "Maximum number of retries by Ozone Client on " + + "encountering exception while writing a key", + tags = ConfigTag.CLIENT) + private int maxRetryCount = 5; + + @Config(key = "retry.interval", + defaultValue = "0", + description = + "Indicates the time duration a client will wait before retrying a " + + "write key request on encountering an exception. By default " + + "there is no wait", + tags = ConfigTag.CLIENT) + private int retryInterval = 0; + + @Config(key = "checksum.type", + defaultValue = "CRC32", + description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] " + + "determines which algorithm would be used to compute checksum for " + + "chunk data. Default checksum type is CRC32.", + tags = ConfigTag.CLIENT) + private String checksumType = ChecksumType.CRC32.name(); + + @Config(key = "bytes.per.checksum", + defaultValue = "1MB", + type = ConfigType.SIZE, + description = "Checksum will be computed for every bytes per checksum " + + "number of bytes and stored sequentially. The minimum value for " + + "this config is 256KB.", + tags = ConfigTag.CLIENT) + private int bytesPerChecksum = 1024 * 1024; + + @Config(key = "verify.checksum", + defaultValue = "true", + description = "Ozone client to verify checksum of the checksum " + + "blocksize data.", + tags = ConfigTag.CLIENT) + private boolean checksumVerify = true; + + public OzoneClientConfig() { + } + + private void validate() { + Preconditions.checkState(streamBufferSize > 0); + Preconditions.checkState(streamBufferFlushSize > 0); + Preconditions.checkState(streamBufferMaxSize > 0); + + Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0, + "expected max. buffer size (%s) to be a multiple of flush size (%s)", + streamBufferMaxSize, streamBufferFlushSize); + Preconditions.checkState(streamBufferFlushSize % streamBufferSize == 0, + "expected flush size (%s) to be a multiple of buffer size (%s)", + streamBufferFlushSize, streamBufferSize); + + if (bytesPerChecksum < + OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) { + LOG.warn("The checksum size ({}) is not allowed to be less than the " + + "minimum size ({}), resetting to the minimum size.", + bytesPerChecksum, + OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE); + bytesPerChecksum = + OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE; + } + + } + + public long getStreamBufferFlushSize() { + return streamBufferFlushSize; + } + + public void setStreamBufferFlushSize(long streamBufferFlushSize) { + this.streamBufferFlushSize = streamBufferFlushSize; + } + + public int getStreamBufferSize() { + return streamBufferSize; + } + + public void setStreamBufferSize(int streamBufferSize) { + this.streamBufferSize = streamBufferSize; + } + + public boolean isStreamBufferFlushDelay() { + return streamBufferFlushDelay; + } + + public void setStreamBufferFlushDelay(boolean streamBufferFlushDelay) { + this.streamBufferFlushDelay = streamBufferFlushDelay; + } + + public long getStreamBufferMaxSize() { + return streamBufferMaxSize; + } + + public void setStreamBufferMaxSize(long streamBufferMaxSize) { + this.streamBufferMaxSize = streamBufferMaxSize; + } + + public int getMaxRetryCount() { + return maxRetryCount; + } + + public void setMaxRetryCount(int maxRetryCount) { + this.maxRetryCount = maxRetryCount; + } + + public int getRetryInterval() { + return retryInterval; + } + + public void setRetryInterval(int retryInterval) { + this.retryInterval = retryInterval; + } + + public ChecksumType getChecksumType() { + return ChecksumType.valueOf(checksumType); + } + + public void setChecksumType(ChecksumType checksumType) { + this.checksumType = checksumType.name(); + } + + public int getBytesPerChecksum() { + return bytesPerChecksum; + } + + public void setBytesPerChecksum(int bytesPerChecksum) { + this.bytesPerChecksum = bytesPerChecksum; + } + + public boolean isChecksumVerify() { + return checksumVerify; + } + + public void setChecksumVerify(boolean checksumVerify) { + this.checksumVerify = checksumVerify; + } + +} diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index e048708107e2..e29bbe3663e0 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -34,9 +34,9 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; @@ -85,13 +85,10 @@ public class BlockOutputStream extends OutputStream { private final BlockData.Builder containerBlockData; private XceiverClientFactory xceiverClientFactory; private XceiverClientSpi xceiverClient; - private final int bytesPerChecksum; + private OzoneClientConfig config; + private int chunkIndex; private final AtomicLong chunkOffset = new AtomicLong(); - private final int streamBufferSize; - private final long streamBufferFlushSize; - private final boolean streamBufferFlushDelay; - private final long streamBufferMaxSize; private final BufferPool bufferPool; // The IOException will be set by response handling thread in case there is an // exception received in the response. If the exception is set, the next @@ -133,46 +130,39 @@ public class BlockOutputStream extends OutputStream { * Creates a new BlockOutputStream. * * @param blockID block ID - * @param xceiverClientFactory client manager that controls client + * @param xceiverClientManager client manager that controls client * @param pipeline pipeline where block will be written * @param bufferPool pool of buffers - * @param streamBufferFlushSize flush size - * @param streamBufferMaxSize max size of the currentBuffer - * @param checksumType checksum type - * @param bytesPerChecksum Bytes per checksum - * @param token a token for this block (may be null) */ - @SuppressWarnings("parameternumber") - public BlockOutputStream(BlockID blockID, - XceiverClientFactory xceiverClientFactory, Pipeline pipeline, - int streamBufferSize, long streamBufferFlushSize, - boolean streamBufferFlushDelay, long streamBufferMaxSize, - BufferPool bufferPool, ChecksumType checksumType, - int bytesPerChecksum, Token token) - throws IOException { + public BlockOutputStream( + BlockID blockID, + XceiverClientFactory xceiverClientManager, + Pipeline pipeline, + BufferPool bufferPool, + OzoneClientConfig config, + Token token + ) throws IOException { + this.xceiverClientFactory = xceiverClientManager; + this.config = config; this.blockID = new AtomicReference<>(blockID); KeyValue keyValue = KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); this.containerBlockData = BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) .addMetadata(keyValue); - this.xceiverClientFactory = xceiverClientFactory; - this.xceiverClient = xceiverClientFactory.acquireClient(pipeline); - this.streamBufferSize = streamBufferSize; - this.streamBufferFlushSize = streamBufferFlushSize; - this.streamBufferMaxSize = streamBufferMaxSize; - this.streamBufferFlushDelay = streamBufferFlushDelay; + this.xceiverClient = xceiverClientManager.acquireClient(pipeline); this.bufferPool = bufferPool; - this.bytesPerChecksum = bytesPerChecksum; this.token = token; //number of buffers used before doing a flush refreshCurrentBuffer(bufferPool); - flushPeriod = (int) (streamBufferFlushSize / streamBufferSize); + flushPeriod = (int) (config.getStreamBufferFlushSize() / config + .getStreamBufferSize()); Preconditions .checkArgument( - (long) flushPeriod * streamBufferSize == streamBufferFlushSize); + (long) flushPeriod * config.getStreamBufferSize() == config + .getStreamBufferFlushSize()); // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); @@ -182,7 +172,8 @@ public BlockOutputStream(BlockID blockID, writtenDataLength = 0; failedServers = new ArrayList<>(0); ioException = new AtomicReference<>(null); - checksum = new Checksum(checksumType, bytesPerChecksum); + checksum = new Checksum(config.getChecksumType(), + config.getBytesPerChecksum()); } private void refreshCurrentBuffer(BufferPool pool) { @@ -290,7 +281,7 @@ private void doFlushOrWatchIfNeeded() throws IOException { private void allocateNewBufferIfNeeded() { if (currentBufferRemaining == 0) { - currentBuffer = bufferPool.allocateBuffer(bytesPerChecksum); + currentBuffer = bufferPool.allocateBuffer(config.getBytesPerChecksum()); currentBufferRemaining = currentBuffer.remaining(); } } @@ -300,7 +291,7 @@ private void updateFlushLength() { } private boolean isBufferPoolFull() { - return bufferPool.computeBufferData() == streamBufferMaxSize; + return bufferPool.computeBufferData() == config.getStreamBufferMaxSize(); } /** @@ -318,7 +309,7 @@ public void writeOnRetry(long len) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Retrying write length {} for blockID {}", len, blockID); } - Preconditions.checkArgument(len <= streamBufferMaxSize); + Preconditions.checkArgument(len <= config.getStreamBufferMaxSize()); int count = 0; while (len > 0) { ChunkBuffer buffer = bufferPool.getBuffer(count); @@ -334,13 +325,13 @@ public void writeOnRetry(long len) throws IOException { // the buffer. We should just validate // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to // call for handling full buffer/flush buffer condition. - if (writtenDataLength % streamBufferFlushSize == 0) { + if (writtenDataLength % config.getStreamBufferFlushSize() == 0) { // reset the position to zero as now we will be reading the // next buffer in the list updateFlushLength(); executePutBlock(false, false); } - if (writtenDataLength == streamBufferMaxSize) { + if (writtenDataLength == config.getStreamBufferMaxSize()) { handleFullBuffer(); } } @@ -486,8 +477,9 @@ ContainerCommandResponseProto> executePutBlock(boolean close, public void flush() throws IOException { if (xceiverClientFactory != null && xceiverClient != null && bufferPool != null && bufferPool.getSize() > 0 - && (!streamBufferFlushDelay || - writtenDataLength - totalDataFlushedLength >= streamBufferSize)) { + && (!config.isStreamBufferFlushDelay() || + writtenDataLength - totalDataFlushedLength + >= config.getStreamBufferSize())) { try { handleFlush(false); } catch (ExecutionException e) { diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index 545c5889bceb..71d04a00e643 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; @@ -97,17 +98,21 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) Mockito.when(xcm.acquireClient(Mockito.any())) .thenReturn(new MockXceiverClientSpi(pipeline)); + OzoneClientConfig config = new OzoneClientConfig(); + config.setStreamBufferSize(4 * 1024 * 1024); + config.setStreamBufferMaxSize(32 * 1024 * 1024); + config.setStreamBufferFlushDelay(true); + config.setStreamBufferFlushSize(16 * 1024 * 1024); + config.setChecksumType(ChecksumType.NONE); + config.setBytesPerChecksum(256 * 1024); + BlockOutputStream outputStream = new BlockOutputStream( new BlockID(1L, 1L), xcm, pipeline, - 4 * 1024 * 1024, - 16 * 1024 * 1024, - true, - 32 * 1024 * 1024, bufferPool, - ChecksumType.NONE, - 256 * 1024, null); + config, + null); return outputStream; } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java index 94ec157ad059..eea8e1f14ef4 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hdds.scm.storage; +import java.io.EOFException; +import java.util.Random; + import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.test.GenericTestUtils; @@ -28,9 +30,6 @@ import org.junit.Before; import org.junit.Test; -import java.io.EOFException; -import java.util.Random; - /** * Tests for {@link ChunkInputStream}'s functionality. */ @@ -48,9 +47,7 @@ public class TestChunkInputStream { @Before public void setup() throws Exception { - checksum = new Checksum(ChecksumType.valueOf( - OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT), - BYTES_PER_CHECKSUM); + checksum = new Checksum(ChecksumType.CRC32, BYTES_PER_CHECKSUM); chunkData = generateRandomData(CHUNK_SIZE); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index eecc97595090..f681d0dd2d9b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CloseContainerRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; @@ -330,7 +331,8 @@ public static PutSmallFileResponseProto writeSmallFile( KeyValue keyValue = KeyValue.newBuilder().setKey("OverWriteRequested").setValue("true") .build(); - Checksum checksum = new Checksum(); + + Checksum checksum = new Checksum(ChecksumType.CRC32, 256); final ChecksumData checksumData = checksum.computeChecksum(data); ChunkInfo chunk = ChunkInfo.newBuilder() diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 482ac88f366c..a448e1a120e6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -18,8 +18,6 @@ package org.apache.hadoop.ozone; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.client.ReplicationFactor; @@ -117,43 +115,6 @@ public final class OzoneConfigKeys { * */ public static final String OZONE_ADMINISTRATORS_WILDCARD = "*"; - public static final String OZONE_CLIENT_STREAM_BUFFER_SIZE = - "ozone.client.stream.buffer.size"; - - public static final String OZONE_CLIENT_STREAM_BUFFER_SIZE_DEFAULT = - "4MB"; - - public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE = - "ozone.client.stream.buffer.flush.size"; - - public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT = - "16MB"; - - public static final String OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE = - "ozone.client.stream.buffer.max.size"; - - public static final String OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT = - "32MB"; - - public static final String OZONE_CLIENT_MAX_RETRIES = - "ozone.client.max.retries"; - public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 5; - public static final String OZONE_CLIENT_RETRY_INTERVAL = - "ozone.client.retry.interval"; - public static final TimeDuration OZONE_CLIENT_RETRY_INTERVAL_DEFAULT = - TimeDuration.valueOf(0, TimeUnit.MILLISECONDS); - - /** - * If this value is true, when the client calls the flush() method, - * it checks whether the data in the buffer is greater than - * OZONE_CLIENT_STREAM_BUFFER_SIZE_DEFAULT. If greater than, - * send the data in the buffer to the datanode. - * */ - public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY = - "ozone.client.stream.buffer.flush.delay"; - public static final boolean OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT = - true; - // This defines the overall connection limit for the connection pool used in // RestClient. public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX = @@ -354,21 +315,9 @@ public final class OzoneConfigKeys { public static final String OZONE_CONTAINER_COPY_WORKDIR = "hdds.datanode.replication.work.dir"; - /** - * Config properties to set client side checksum properties. - */ - public static final String OZONE_CLIENT_CHECKSUM_TYPE = - "ozone.client.checksum.type"; - public static final String OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT = "CRC32"; - public static final String OZONE_CLIENT_BYTES_PER_CHECKSUM = - "ozone.client.bytes.per.checksum"; - public static final String OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT = "1MB"; - public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES = - 1024 * 1024; + public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE = 256 * 1024; - public static final String OZONE_CLIENT_VERIFY_CHECKSUM = - "ozone.client.verify.checksum"; - public static final boolean OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT = true; + public static final String OZONE_CLIENT_READ_TIMEOUT = "ozone.client.read.timeout"; public static final String OZONE_CLIENT_READ_TIMEOUT_DEFAULT = "30s"; @@ -459,16 +408,21 @@ public final class OzoneConfigKeys { "ssl.server.keystore.location"; public static final String OZONE_SERVER_HTTPS_TRUSTSTORE_LOCATION_KEY = "ssl.server.truststore.location"; - public static final String OZONE_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY = + public static final String OZONE_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY = "ssl.server.truststore.password"; - public static final String OZONE_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY = + public static final String OZONE_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY = "ozone.https.client.keystore.resource"; - public static final String OZONE_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = + public static final String OZONE_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml"; - public static final String OZONE_CLIENT_HTTPS_NEED_AUTH_KEY = + public static final String OZONE_CLIENT_HTTPS_NEED_AUTH_KEY = "ozone.https.client.need-auth"; public static final boolean OZONE_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false; + public static final String OZONE_OM_KEYNAME_CHARACTER_CHECK_ENABLED_KEY = + "ozone.om.keyname.character.check.enabled"; + public static final boolean OZONE_OM_KEYNAME_CHARACTER_CHECK_ENABLED_DEFAULT = + false; + /** * There is no need to instantiate this class. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java index 98974ee0a50d..d86f7b1c40c1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.ozone.common; -import com.google.common.annotations.VisibleForTesting; - import java.nio.ByteBuffer; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -27,11 +25,11 @@ import java.util.function.Function; import java.util.function.Supplier; -import com.google.common.primitives.Ints; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ChecksumType; -import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,18 +108,6 @@ public Checksum(ChecksumType type, int bytesPerChecksum) { this.bytesPerChecksum = bytesPerChecksum; } - /** - * Constructs a Checksum object with default ChecksumType and default - * BytesPerChecksum. - */ - @VisibleForTesting - public Checksum() { - this.checksumType = ChecksumType.valueOf( - OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); - this.bytesPerChecksum = OzoneConfigKeys - .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB - } - /** * Computes checksum for give data. * @param data input data. diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 485397819fcd..66fc8ca7f673 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -385,59 +385,6 @@ Connection timeout for Ozone client in milliseconds. - - ozone.client.stream.buffer.flush.delay - true - OZONE, CLIENT - - Default true, when call flush() and determine whether the data in the - current buffer is greater than ozone.client.stream.buffer.size, if - greater than then send buffer to the datanode. You can turn this off - by setting this configuration to false. - - - - ozone.client.stream.buffer.size - 4MB - OZONE, CLIENT - The size of chunks the client will send to the server. - - - - ozone.client.stream.buffer.flush.size - 16MB - OZONE, CLIENT - Size which determines at what buffer position a partial - flush will be initiated during write. It should be a multiple - of ozone.client.stream.buffer.size. - - - - ozone.client.stream.buffer.max.size - 32MB - OZONE, CLIENT - Size which determines at what buffer position - write call be blocked till acknowledgement of the first partial flush - happens by all servers. - - - - ozone.client.max.retries - 5 - OZONE, CLIENT - Maximum number of retries by Ozone Client on encountering - exception while writing a key. - - - - ozone.client.retry.interval - 0ms - OZONE, CLIENT - Indicates the time duration a client will wait before - retrying a write key request on encountering an exception. By default - there is no wait. - - ozone.client.socket.timeout 5000ms @@ -1543,34 +1490,6 @@ - - ozone.client.checksum.type - CRC32 - OZONE, CLIENT, MANAGEMENT - The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] determines - which algorithm would be used to compute checksum for chunk data. - Default checksum type is CRC32. - - - - - ozone.client.bytes.per.checksum - 1MB - OZONE, CLIENT, MANAGEMENT - Checksum will be computed for every bytes per checksum number - of bytes and stored sequentially. The minimum value for this config is - 256KB. - - - - - ozone.client.verify.checksum - true - OZONE, CLIENT, MANAGEMENT - - Ozone client to verify checksum of the checksum blocksize data. - - ozone.client.read.timeout diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java index 6d25cf8fc12a..469faac7444b 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java @@ -17,8 +17,13 @@ */ package org.apache.hadoop.hdds.ratis; +import java.util.Random; +import java.util.UUID; +import java.util.function.BiFunction; + import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; @@ -29,14 +34,11 @@ import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.OzoneChecksumException; + import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.Assert; import org.junit.Test; -import java.util.Random; -import java.util.UUID; -import java.util.function.BiFunction; - /** Testing {@link ContainerCommandRequestMessage}. */ public class TestContainerCommandRequestMessage { static final Random RANDOM = new Random(); @@ -51,7 +53,8 @@ static ByteString newData(int length) { static ChecksumData checksum(ByteString data) { try { - return new Checksum().computeChecksum(data.asReadOnlyByteBuffer()); + return new Checksum(ChecksumType.CRC32, 1024 * 1024) + .computeChecksum(data.asReadOnlyByteBuffer()); } catch (OzoneChecksumException e) { throw new IllegalStateException(e); } diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationReflectionUtil.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationReflectionUtil.java index 653300e96c22..229390e2d84b 100644 --- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationReflectionUtil.java +++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationReflectionUtil.java @@ -94,6 +94,16 @@ public static void injectConfigurationToObject(ConfigurationSource from, forcedFieldSet(field, configuration, from.getTimeDuration(key, "0s", configAnnotation.timeUnit())); break; + case SIZE: + final long value = + Math.round(from.getStorageSize(key, "0b", StorageUnit.BYTES)); + if (field.getType() == int.class) { + forcedFieldSet(field, configuration, (int) value); + } else { + forcedFieldSet(field, configuration, value); + + } + break; case CLASS: forcedFieldSet(field, configuration, from.getClass(key, Object.class)); @@ -233,6 +243,10 @@ private static void updateConfigurationFromObject( config.setTimeDuration(key, field.getLong(configObject), configAnnotation.timeUnit()); break; + case SIZE: + config.setStorageSize(key, field.getLong(configObject), + StorageUnit.BYTES); + break; case CLASS: Object valueClass = field.get(configObject); if (valueClass instanceof Class) { diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java index 12903cf702ae..acc6614dab3e 100644 --- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java +++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java @@ -44,6 +44,10 @@ default void setTimeDuration(String name, long value, TimeUnit unit) { set(name, value + ParsedTimeDuration.unitFor(unit).suffix()); } + default void setStorageSize(String name, long value, StorageUnit unit) { + set(name, value + unit.getShortName()); + } + default void setFromObject(T object) { ConfigGroup configGroup = object.getClass().getAnnotation(ConfigGroup.class); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index b8ebaecf282d..5b25dba6e957 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -25,11 +25,11 @@ import java.util.Map; import java.util.Random; -import com.google.common.base.Strings; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto.Builder; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; @@ -49,6 +49,7 @@ import org.apache.hadoop.security.token.Token; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerProxy; @@ -109,7 +110,8 @@ public static ChunkBuffer getData(int len) { */ public static void setDataChecksum(ChunkInfo info, ChunkBuffer data) throws OzoneChecksumException { - Checksum checksum = new Checksum(); + Checksum checksum = new Checksum(ChecksumType.CRC32, + 1024 * 1024); info.setChecksumData(checksum.computeChecksum(data)); data.rewind(); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index 785ed8bc52f9..22fa88fd05a7 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -17,11 +17,23 @@ package org.apache.hadoop.ozone.container.common.impl; -import com.google.common.collect.Maps; -import org.apache.commons.io.FileUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -37,22 +49,31 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; +import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; +import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl; import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory; -import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; +import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; + +import com.google.common.collect.Maps; +import org.apache.commons.io.FileUtils; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID; +import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk; +import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData; +import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import static org.junit.Assert.fail; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; @@ -64,26 +85,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID; -import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk; -import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData; -import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum; -import static org.junit.Assert.fail; - /** * Simple tests to verify that container persistence works as expected. Some of * these tests are specific to {@link KeyValueContainer}. If a new {@link @@ -423,7 +424,7 @@ public void testWritReadManyChunks() throws IOException { Path dataDir = Paths.get(cNewData.getChunksPath()); // Read chunk via file system and verify. - Checksum checksum = new Checksum(); + Checksum checksum = new Checksum(ChecksumType.CRC32, 1024 * 1024); // Read chunk via ReadChunk call. for (int x = 0; x < chunkCount; x++) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 81bac07eab6f..8e90c54ee920 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -24,12 +24,13 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import com.google.common.annotations.VisibleForTesting; @@ -39,53 +40,40 @@ * */ public final class BlockOutputStreamEntry extends OutputStream { + private final OzoneClientConfig config; private OutputStream outputStream; private BlockID blockID; private final String key; private final XceiverClientFactory xceiverClientManager; private final Pipeline pipeline; - private final ChecksumType checksumType; - private final int bytesPerChecksum; - private final int chunkSize; // total number of bytes that should be written to this stream private final long length; // the current position of this stream 0 <= currentPosition < length private long currentPosition; private final Token token; - private final int streamBufferSize; - private final long streamBufferFlushSize; - private final boolean streamBufferFlushDelay; - private final long streamBufferMaxSize; - private final long watchTimeout; private BufferPool bufferPool; @SuppressWarnings({"parameternumber", "squid:S00107"}) - private BlockOutputStreamEntry(BlockID blockID, String key, + private BlockOutputStreamEntry( + BlockID blockID, String key, XceiverClientFactory xceiverClientManager, - Pipeline pipeline, String requestId, int chunkSize, - long length, int streamBufferSize, long streamBufferFlushSize, - boolean streamBufferFlushDelay, long streamBufferMaxSize, - long watchTimeout, BufferPool bufferPool, - ChecksumType checksumType, int bytesPerChecksum, - Token token) { + Pipeline pipeline, + long length, + BufferPool bufferPool, + Token token, + OzoneClientConfig config + ) { + this.config = config; this.outputStream = null; this.blockID = blockID; this.key = key; this.xceiverClientManager = xceiverClientManager; this.pipeline = pipeline; - this.chunkSize = chunkSize; this.token = token; this.length = length; this.currentPosition = 0; - this.streamBufferSize = streamBufferSize; - this.streamBufferFlushSize = streamBufferFlushSize; - this.streamBufferFlushDelay = streamBufferFlushDelay; - this.streamBufferMaxSize = streamBufferMaxSize; - this.watchTimeout = watchTimeout; this.bufferPool = bufferPool; - this.checksumType = checksumType; - this.bytesPerChecksum = bytesPerChecksum; } long getLength() { @@ -108,11 +96,12 @@ long getRemaining() { */ private void checkStream() throws IOException { if (this.outputStream == null) { + if (getToken() != null) { + UserGroupInformation.getCurrentUser().addToken(getToken()); + } this.outputStream = new BlockOutputStream(blockID, xceiverClientManager, - pipeline, streamBufferSize, streamBufferFlushSize, - streamBufferFlushDelay, streamBufferMaxSize, bufferPool, - checksumType, bytesPerChecksum, token); + pipeline, bufferPool, config, token); } } @@ -212,28 +201,10 @@ public static class Builder { private String key; private XceiverClientFactory xceiverClientManager; private Pipeline pipeline; - private String requestId; - private int chunkSize; private long length; - private int streamBufferSize; - private long streamBufferFlushSize; - private boolean streamBufferFlushDelay; - private long streamBufferMaxSize; - private long watchTimeout; private BufferPool bufferPool; private Token token; - private ChecksumType checksumType; - private int bytesPerChecksum; - - public Builder setChecksumType(ChecksumType type) { - this.checksumType = type; - return this; - } - - public Builder setBytesPerChecksum(int bytes) { - this.bytesPerChecksum = bytes; - return this; - } + private OzoneClientConfig config; public Builder setBlockID(BlockID bID) { this.blockID = bID; @@ -257,48 +228,20 @@ public Builder setPipeline(Pipeline ppln) { return this; } - public Builder setRequestId(String request) { - this.requestId = request; - return this; - } - - public Builder setChunkSize(int cSize) { - this.chunkSize = cSize; - return this; - } public Builder setLength(long len) { this.length = len; return this; } - public Builder setStreamBufferSize(int bufferSize) { - this.streamBufferSize = bufferSize; - return this; - } - - public Builder setStreamBufferFlushSize(long bufferFlushSize) { - this.streamBufferFlushSize = bufferFlushSize; - return this; - } - - public Builder setStreamBufferFlushDelay(boolean bufferFlushDelay) { - this.streamBufferFlushDelay = bufferFlushDelay; - return this; - } - - public Builder setStreamBufferMaxSize(long bufferMaxSize) { - this.streamBufferMaxSize = bufferMaxSize; - return this; - } - public Builder setWatchTimeout(long timeout) { - this.watchTimeout = timeout; + public Builder setBufferPool(BufferPool pool) { + this.bufferPool = pool; return this; } - public Builder setbufferPool(BufferPool pool) { - this.bufferPool = pool; + public Builder setConfig(OzoneClientConfig clientConfig) { + this.config = clientConfig; return this; } @@ -308,11 +251,13 @@ public Builder setToken(Token bToken) { } public BlockOutputStreamEntry build() { - return new BlockOutputStreamEntry(blockID, key, - xceiverClientManager, pipeline, requestId, chunkSize, - length, streamBufferSize, streamBufferFlushSize, - streamBufferFlushDelay, streamBufferMaxSize, watchTimeout, - bufferPool, checksumType, bytesPerChecksum, token); + return new BlockOutputStreamEntry(blockID, + key, + xceiverClientManager, + pipeline, + length, + bufferPool, + token, config); } } @@ -337,34 +282,10 @@ public Pipeline getPipeline() { return pipeline; } - public int getChunkSize() { - return chunkSize; - } - public long getCurrentPosition() { return currentPosition; } - public int getStreamBufferSize() { - return streamBufferSize; - } - - public long getStreamBufferFlushSize() { - return streamBufferFlushSize; - } - - public boolean getStreamBufferFlushDelay() { - return streamBufferFlushDelay; - } - - public long getStreamBufferMaxSize() { - return streamBufferMaxSize; - } - - public long getWatchTimeout() { - return watchTimeout; - } - public BufferPool getBufferPool() { return bufferPool; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index c9decc387d68..cf7e841fb7b1 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -23,14 +23,14 @@ import java.util.List; import java.util.ListIterator; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ByteStringConversion; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.BufferPool; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -53,20 +53,12 @@ public class BlockOutputStreamEntryPool { LoggerFactory.getLogger(BlockOutputStreamEntryPool.class); private final List streamEntries; + private final OzoneClientConfig config; private int currentStreamIndex; private final OzoneManagerProtocol omClient; private final OmKeyArgs keyArgs; private final XceiverClientFactory xceiverClientFactory; - private final int chunkSize; private final String requestID; - private final int streamBufferSize; - private final long streamBufferFlushSize; - private final boolean streamBufferFlushDelay; - private final long streamBufferMaxSize; - private final long watchTimeout; - private final long blockSize; - private final int bytesPerChecksum; - private final ContainerProtos.ChecksumType checksumType; private final BufferPool bufferPool; private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private final long openID; @@ -74,17 +66,17 @@ public class BlockOutputStreamEntryPool { @SuppressWarnings({"parameternumber", "squid:S00107"}) public BlockOutputStreamEntryPool( + OzoneClientConfig config, OzoneManagerProtocol omClient, - int chunkSize, String requestId, HddsProtos.ReplicationFactor factor, + String requestId, HddsProtos.ReplicationFactor factor, HddsProtos.ReplicationType type, - int bufferSize, long bufferFlushSize, - boolean bufferFlushDelay, long bufferMaxSize, - long size, long watchTimeout, ContainerProtos.ChecksumType checksumType, - int bytesPerChecksum, String uploadID, int partNumber, + String uploadID, int partNumber, boolean isMultipart, OmKeyInfo info, boolean unsafeByteBufferConversion, XceiverClientFactory xceiverClientFactory, long openID ) { + this.config = config; + this.xceiverClientFactory = xceiverClientFactory; streamEntries = new ArrayList<>(); currentStreamIndex = 0; this.omClient = omClient; @@ -93,38 +85,14 @@ public BlockOutputStreamEntryPool( .setType(type).setFactor(factor).setDataSize(info.getDataSize()) .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID) .setMultipartUploadPartNumber(partNumber).build(); - this.xceiverClientFactory = xceiverClientFactory; - this.chunkSize = chunkSize; this.requestID = requestId; - this.streamBufferSize = bufferSize; - this.streamBufferFlushSize = bufferFlushSize; - this.streamBufferFlushDelay = bufferFlushDelay; - this.streamBufferMaxSize = bufferMaxSize; - this.blockSize = size; - this.watchTimeout = watchTimeout; - this.bytesPerChecksum = bytesPerChecksum; - this.checksumType = checksumType; this.openID = openID; this.excludeList = new ExcludeList(); - Preconditions.checkState(chunkSize > 0); - Preconditions.checkState(streamBufferSize > 0); - Preconditions.checkState(streamBufferFlushSize > 0); - Preconditions.checkState(streamBufferMaxSize > 0); - Preconditions.checkState(blockSize > 0); - Preconditions.checkState(blockSize >= streamBufferMaxSize); - Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0, - "expected max. buffer size (%s) to be a multiple of flush size (%s)", - streamBufferMaxSize, streamBufferFlushSize); - Preconditions.checkState(streamBufferFlushSize % streamBufferSize == 0, - "expected flush size (%s) to be a multiple of buffer size (%s)", - streamBufferFlushSize, streamBufferSize); - Preconditions.checkState(chunkSize % streamBufferSize == 0, - "expected chunk size (%s) to be a multiple of buffer size (%s)", - chunkSize, streamBufferSize); this.bufferPool = - new BufferPool(streamBufferSize, - (int) (streamBufferMaxSize / streamBufferSize), + new BufferPool(config.getStreamBufferSize(), + (int) (config.getStreamBufferMaxSize() / config + .getStreamBufferSize()), ByteStringConversion .createByteBufferConversion(unsafeByteBufferConversion)); } @@ -140,19 +108,16 @@ public BlockOutputStreamEntryPool( omClient = null; keyArgs = null; xceiverClientFactory = null; - chunkSize = 0; + config = + new OzoneConfiguration().getObject(OzoneClientConfig.class); + config.setStreamBufferSize(0); + config.setStreamBufferMaxSize(0); + config.setStreamBufferFlushSize(0); + config.setStreamBufferFlushDelay(false); requestID = null; - streamBufferSize = 0; - streamBufferFlushSize = 0; - streamBufferFlushDelay = false; - streamBufferMaxSize = 0; + int chunkSize = 0; bufferPool = new BufferPool(chunkSize, 1); - watchTimeout = 0; - blockSize = 0; - this.checksumType = ContainerProtos.ChecksumType.valueOf( - OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); - this.bytesPerChecksum = OzoneConfigKeys - .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB + currentStreamIndex = 0; openID = -1; excludeList = new ExcludeList(); @@ -189,17 +154,9 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) { .setKey(keyArgs.getKeyName()) .setXceiverClientManager(xceiverClientFactory) .setPipeline(subKeyInfo.getPipeline()) - .setRequestId(requestID) - .setChunkSize(chunkSize) + .setConfig(config) .setLength(subKeyInfo.getLength()) - .setStreamBufferSize(streamBufferSize) - .setStreamBufferFlushSize(streamBufferFlushSize) - .setStreamBufferFlushDelay(streamBufferFlushDelay) - .setStreamBufferMaxSize(streamBufferMaxSize) - .setWatchTimeout(watchTimeout) - .setbufferPool(bufferPool) - .setChecksumType(checksumType) - .setBytesPerChecksum(bytesPerChecksum) + .setBufferPool(bufferPool) .setToken(subKeyInfo.getToken()); streamEntries.add(builder.build()); } @@ -363,10 +320,6 @@ public ExcludeList getExcludeList() { return excludeList; } - public long getStreamBufferMaxSize() { - return streamBufferMaxSize; - } - boolean isEmpty() { return streamEntries.isEmpty(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 03cdb721d51d..b2a4e9211948 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -29,9 +29,9 @@ import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; @@ -67,6 +67,8 @@ */ public class KeyOutputStream extends OutputStream { + private OzoneClientConfig config; + /** * Defines stream action while calling handleFlushOrClose. */ @@ -126,29 +128,33 @@ public int getRetryCount() { } @SuppressWarnings({"parameternumber", "squid:S00107"}) - public KeyOutputStream(OpenKeySession handler, + public KeyOutputStream( + OzoneClientConfig config, + OpenKeySession handler, XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient, int chunkSize, String requestId, ReplicationFactor factor, ReplicationType type, - int bufferSize, long bufferFlushSize, boolean isBufferFlushDelay, - long bufferMaxSize, long size, long watchTimeout, - ChecksumType checksumType, int bytesPerChecksum, String uploadID, int partNumber, boolean isMultipart, - int maxRetryCount, long retryInterval, - boolean unsafeByteBufferConversion) { + boolean unsafeByteBufferConversion + ) { + this.config = config; OmKeyInfo info = handler.getKeyInfo(); blockOutputStreamEntryPool = - new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor, - type, bufferSize, bufferFlushSize, isBufferFlushDelay, - bufferMaxSize, size, - watchTimeout, checksumType, bytesPerChecksum, uploadID, partNumber, - isMultipart, info, unsafeByteBufferConversion, - xceiverClientManager, handler.getId()); + new BlockOutputStreamEntryPool( + config, + omClient, + requestId, factor, type, + uploadID, partNumber, + isMultipart, info, + unsafeByteBufferConversion, + xceiverClientManager, + handler.getId()); + // Retrieve the file encryption key info, null if file is not in // encrypted bucket. this.feInfo = info.getFileEncryptionInfo(); this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException( - maxRetryCount, retryInterval); + config.getMaxRetryCount(), config.getRetryInterval()); this.retryCount = 0; this.isException = false; this.writeOffset = 0; @@ -258,8 +264,8 @@ private int writeToOutputStream(BlockOutputStreamEntry current, // to or less than the max length of the buffer allocated. // The len specified here is the combined sum of the data length of // the buffers - Preconditions.checkState(!retry || len <= blockOutputStreamEntryPool - .getStreamBufferMaxSize()); + Preconditions.checkState(!retry || len <= config + .getStreamBufferMaxSize()); int dataWritten = (int) (current.getWrittenDataLength() - currentPos); writeLen = retry ? (int) len : dataWritten; // In retry path, the data written is already accounted in offset. @@ -310,7 +316,7 @@ private void handleException(BlockOutputStreamEntry streamEntry, pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount); } Preconditions.checkArgument( - bufferedDataLen <= blockOutputStreamEntryPool.getStreamBufferMaxSize()); + bufferedDataLen <= config.getStreamBufferMaxSize()); Preconditions.checkArgument( offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen); long containerId = streamEntry.getBlockID().getContainerID(); @@ -549,20 +555,11 @@ public static class Builder { private String requestID; private ReplicationType type; private ReplicationFactor factor; - private int streamBufferSize; - private long streamBufferFlushSize; - private boolean streamBufferFlushDelay; - private long streamBufferMaxSize; - private long blockSize; - private long watchTimeout; - private ChecksumType checksumType; - private int bytesPerChecksum; private String multipartUploadID; private int multipartNumber; private boolean isMultipartKey; - private int maxRetryCount; - private long retryInterval; private boolean unsafeByteBufferConversion; + private OzoneClientConfig clientConfig; public Builder setMultipartUploadID(String uploadID) { this.multipartUploadID = uploadID; @@ -609,53 +606,13 @@ public Builder setFactor(ReplicationFactor replicationFactor) { return this; } - public Builder setStreamBufferSize(int size) { - this.streamBufferSize = size; - return this; - } - - public Builder setStreamBufferFlushSize(long size) { - this.streamBufferFlushSize = size; - return this; - } - - public Builder setStreamBufferFlushDelay(boolean isDelay) { - this.streamBufferFlushDelay = isDelay; - return this; - } - - public Builder setStreamBufferMaxSize(long size) { - this.streamBufferMaxSize = size; - return this; - } - - public Builder setBlockSize(long size) { - this.blockSize = size; - return this; - } - - public Builder setChecksumType(ChecksumType cType) { - this.checksumType = cType; - return this; - } - - public Builder setBytesPerChecksum(int bytes) { - this.bytesPerChecksum = bytes; - return this; - } - public Builder setIsMultipartKey(boolean isMultipart) { this.isMultipartKey = isMultipart; return this; } - public Builder setMaxRetryCount(int maxCount) { - this.maxRetryCount = maxCount; - return this; - } - - public Builder setRetryInterval(long retryIntervalInMS) { - this.retryInterval = retryIntervalInMS; + public Builder setConfig(OzoneClientConfig config) { + this.clientConfig = config; return this; } @@ -665,13 +622,19 @@ public Builder enableUnsafeByteBufferConversion(boolean enabled) { } public KeyOutputStream build() { - return new KeyOutputStream(openHandler, xceiverManager, omClient, - chunkSize, requestID, factor, type, - streamBufferSize, streamBufferFlushSize, streamBufferFlushDelay, - streamBufferMaxSize, - blockSize, watchTimeout, checksumType, - bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey, - maxRetryCount, retryInterval, unsafeByteBufferConversion); + return new KeyOutputStream( + clientConfig, + openHandler, + xceiverManager, + omClient, + chunkSize, + requestID, + factor, + type, + multipartUploadID, + multipartNumber, + isMultipartKey, + unsafeByteBufferConversion); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index c61d0eb2074d..8c0ed41c78a4 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -42,8 +42,8 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.StorageType; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; @@ -104,7 +104,6 @@ import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType; import org.apache.hadoop.ozone.security.acl.OzoneAclConfig; import org.apache.hadoop.ozone.security.acl.OzoneObj; -import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -130,31 +129,24 @@ public class RpcClient implements ClientProtocol { private final OzoneManagerProtocol ozoneManagerClient; private final XceiverClientManager xceiverClientManager; private final int chunkSize; - private final ChecksumType checksumType; - private final int bytesPerChecksum; - private final boolean unsafeByteBufferConversion; - private boolean verifyChecksum; private final UserGroupInformation ugi; private final ACLType userRights; private final ACLType groupRights; - private final int streamBufferSize; - private final long streamBufferFlushSize; - private boolean streamBufferFlushDelay; - private final long streamBufferMaxSize; private final long blockSize; private final ClientId clientId = ClientId.randomId(); - private final int maxRetryCount; - private final long retryInterval; + private final boolean unsafeByteBufferConversion; private Text dtService; private final boolean topologyAwareReadEnabled; private final boolean checkKeyNameEnabled; + private final OzoneClientConfig clientConfig; /** - * Creates RpcClient instance with the given configuration. - * @param conf Configuration - * @param omServiceId OM HA Service ID, set this to null if not HA - * @throws IOException - */ + * Creates RpcClient instance with the given configuration. + * + * @param conf Configuration + * @param omServiceId OM HA Service ID, set this to null if not HA + * @throws IOException + */ public RpcClient(ConfigurationSource conf, String omServiceId) throws IOException { Preconditions.checkNotNull(conf); @@ -165,6 +157,8 @@ public RpcClient(ConfigurationSource conf, String omServiceId) this.userRights = aclConfig.getUserDefaultRights(); this.groupRights = aclConfig.getGroupDefaultRights(); + this.clientConfig = conf.getObject(OzoneClientConfig.class); + OmTransport omTransport = OmTransportFactory.create(conf, ugi, omServiceId); this.ozoneManagerClient = TracingUtil.createProxy( @@ -194,57 +188,14 @@ public RpcClient(ConfigurationSource conf, String omServiceId) } else { chunkSize = configuredChunkSize; } - streamBufferSize = (int) conf - .getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_SIZE, - OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_SIZE_DEFAULT, - StorageUnit.BYTES); - streamBufferFlushSize = (long) conf - .getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, - OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT, - StorageUnit.BYTES); - streamBufferFlushDelay = conf.getBoolean( - OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, - OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT); - streamBufferMaxSize = (long) conf - .getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, - OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT, - StorageUnit.BYTES); + blockSize = (long) conf.getStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES); unsafeByteBufferConversion = conf.getBoolean( OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); - - int configuredChecksumSize = (int) conf.getStorageSize( - OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM, - OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT, - StorageUnit.BYTES); - if(configuredChecksumSize < - OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) { - LOG.warn("The checksum size ({}) is not allowed to be less than the " + - "minimum size ({}), resetting to the minimum size.", - configuredChecksumSize, - OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE); - bytesPerChecksum = - OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE; - } else { - bytesPerChecksum = configuredChecksumSize; - } - String checksumTypeStr = conf.get( - OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, - OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); - checksumType = ChecksumType.valueOf(checksumTypeStr); - this.verifyChecksum = - conf.getBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM, - OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT); - maxRetryCount = - conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys. - OZONE_CLIENT_MAX_RETRIES_DEFAULT); - retryInterval = OzoneUtils.getTimeDurationInMS(conf, - OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL, - OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL_DEFAULT); topologyAwareReadEnabled = conf.getBoolean( OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT); @@ -715,7 +666,7 @@ public OzoneOutputStream createKey( throws IOException { verifyVolumeName(volumeName); verifyBucketName(bucketName); - if(checkKeyNameEnabled) { + if (clientConfig.isStreamBufferFlushDelay()) { HddsClientUtils.verifyKeyName(keyName); } HddsClientUtils.checkNotNull(keyName, type, factor); @@ -963,21 +914,14 @@ public OzoneOutputStream createMultipartKey(String volumeName, .setHandler(openKey) .setXceiverClientManager(xceiverClientManager) .setOmClient(ozoneManagerClient) - .setChunkSize(chunkSize) .setRequestID(requestId) .setType(openKey.getKeyInfo().getType()) .setFactor(openKey.getKeyInfo().getFactor()) - .setStreamBufferSize(streamBufferSize) - .setStreamBufferFlushSize(streamBufferFlushSize) - .setStreamBufferMaxSize(streamBufferMaxSize) - .setBlockSize(blockSize) - .setBytesPerChecksum(bytesPerChecksum) - .setChecksumType(checksumType) .setMultipartNumber(partNumber) .setMultipartUploadID(uploadID) .setIsMultipartKey(true) - .setMaxRetryCount(maxRetryCount) - .setRetryInterval(retryInterval) + .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) + .setConfig(clientConfig) .build(); keyOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), @@ -1234,7 +1178,7 @@ private OzoneInputStream createInputStream( throws IOException { LengthInputStream lengthInputStream = KeyInputStream .getFromOmKeyInfo(keyInfo, xceiverClientManager, - verifyChecksum, retryFunction); + clientConfig.isChecksumVerify(), retryFunction); FileEncryptionInfo feInfo = keyInfo.getFileEncryptionInfo(); if (feInfo != null) { final KeyProvider.KeyVersion decrypted = getDEK(feInfo); @@ -1271,20 +1215,11 @@ private OzoneOutputStream createOutputStream(OpenKeySession openKey, .setHandler(openKey) .setXceiverClientManager(xceiverClientManager) .setOmClient(ozoneManagerClient) - .setChunkSize(chunkSize) .setRequestID(requestId) .setType(HddsProtos.ReplicationType.valueOf(type.toString())) .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue())) - .setStreamBufferSize(streamBufferSize) - .setStreamBufferFlushSize(streamBufferFlushSize) - .setStreamBufferFlushDelay(streamBufferFlushDelay) - .setStreamBufferMaxSize(streamBufferMaxSize) - .setBlockSize(blockSize) - .setChecksumType(checksumType) - .setBytesPerChecksum(bytesPerChecksum) - .setMaxRetryCount(maxRetryCount) - .setRetryInterval(retryInterval) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) + .setConfig(clientConfig) .build(); keyOutputStream .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(), diff --git a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java index 3267976f7670..f0dfba88e01f 100644 --- a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java +++ b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java @@ -18,20 +18,22 @@ package org.apache.hadoop.ozone; +import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; -import java.util.List; +import java.util.Collections; import java.util.HashSet; -import java.util.ArrayList; +import java.util.List; import java.util.Set; -import java.util.Collections; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; @@ -42,12 +44,11 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.test.GenericTestUtils; + +import org.apache.commons.lang3.RandomUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.concurrent.TimeUnit; - /** * This class causes random failures in the chaos cluster. */ @@ -198,16 +199,17 @@ public Builder addFailures(Class clazz) { protected void initializeConfiguration() throws IOException { super.initializeConfiguration(); + + OzoneClientConfig clientConfig =new OzoneClientConfig(); + clientConfig.setStreamBufferFlushSize(8 * 1024 * 1024); + clientConfig.setStreamBufferMaxSize(16 * 1024 * 1024); + clientConfig.setStreamBufferSize(4 * 1024); + conf.setFromObject(clientConfig); + conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, 4, StorageUnit.KB); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 32, StorageUnit.KB); - conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, - 8, StorageUnit.KB); - conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, - 16, StorageUnit.KB); - conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_SIZE, - 4, StorageUnit.KB); conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, 1, StorageUnit.MB); conf.setTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, 1000, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 1c1fcc657c83..629ab5af098d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -17,19 +17,6 @@ */ package org.apache.hadoop.ozone; -import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; -import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_KEY; -import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_ADDRESS_KEY; -import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_PORT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_HTTP_ADDRESS_KEY; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR; - import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -46,14 +33,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.DFSConfigKeysLegacy; import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; @@ -79,6 +66,20 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.test.GenericTestUtils; + +import org.apache.commons.io.FileUtils; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; +import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_KEY; +import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_ADDRESS_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_PORT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR; import org.hadoop.ozone.recon.codegen.ReconSqlDbConfig; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; @@ -596,14 +597,20 @@ protected void initializeConfiguration() throws IOException { if (!streamBufferSizeUnit.isPresent()) { streamBufferSizeUnit = Optional.of(StorageUnit.MB); } + + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setStreamBufferSize( + (int) Math.round( + streamBufferSizeUnit.get().toBytes(streamBufferSize.getAsInt()))); + clientConfig.setStreamBufferMaxSize(Math.round( + streamBufferSizeUnit.get().toBytes(streamBufferMaxSize.get()))); + clientConfig.setStreamBufferFlushSize(Math.round( + streamBufferSizeUnit.get().toBytes(streamBufferFlushSize.get()))); + conf.setFromObject(clientConfig); + conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize.get(), streamBufferSizeUnit.get()); - conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_SIZE, - streamBufferSize.getAsInt(), streamBufferSizeUnit.get()); - conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, - streamBufferFlushSize.get(), streamBufferSizeUnit.get()); - conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, - streamBufferMaxSize.get(), streamBufferSizeUnit.get()); + conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(), streamBufferSizeUnit.get()); // MiniOzoneCluster should have global pipeline upper limit. diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java index 44b47dc270b9..639a64db626f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java @@ -17,10 +17,17 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.IOException; +import java.io.OutputStream; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; @@ -33,22 +40,16 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - import org.junit.Rule; +import org.junit.Test; import org.junit.rules.Timeout; -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY; /** * Tests BlockOutputStream class. @@ -85,13 +86,20 @@ public static void init() throws Exception { flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; + OzoneClientConfig config = new OzoneClientConfig(); + config.setChecksumType(ChecksumType.NONE); + conf.setFromObject(config); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); - conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); - conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false); + + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setStreamBufferFlushDelay(false); + conf.setFromObject(clientConfig); + cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) .setTotalPipelineNumLimit(10) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java index 93a3ad62dee2..ab12cd9b6beb 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java @@ -17,6 +17,11 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.IOException; +import java.io.OutputStream; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -33,21 +38,16 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - import org.junit.Rule; +import org.junit.Test; import org.junit.rules.Timeout; -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; /** * Tests TestBlockOutputStreamFlushDelay class. @@ -84,9 +84,9 @@ public static void init() throws Exception { flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); - conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index 4d7a6db984bd..8463c1d6a8e1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -16,12 +16,21 @@ */ package org.apache.hadoop.ozone.client.rpc; +import java.io.IOException; +import java.io.OutputStream; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; @@ -37,27 +46,19 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import org.apache.ratis.protocol.exceptions.GroupMismatchException; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.ratis.protocol.exceptions.RaftRetryFailureException; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.io.OutputStream; -import java.time.Duration; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - import org.junit.Rule; +import org.junit.Test; import org.junit.rules.Timeout; -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY; /** * Tests failure detection and handling in BlockOutputStream Class. @@ -95,10 +96,14 @@ public void init() throws Exception { flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; + + OzoneClientConfig config = new OzoneClientConfig(); + config.setChecksumType(ChecksumType.NONE); + conf.setFromObject(config); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 10, TimeUnit.SECONDS); conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 10, TimeUnit.SECONDS); - conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); @@ -115,7 +120,9 @@ public void init() throws Exception { raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(3)); conf.setFromObject(raftClientConfig); - conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setStreamBufferFlushDelay(false); + conf.setFromObject(clientConfig); RatisClientConfig ratisClientConfig = conf.getObject(RatisClientConfig.class); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java index f1ed5b162718..21e43e64d682 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java @@ -16,13 +16,21 @@ */ package org.apache.hadoop.ozone.client.rpc; +import java.io.IOException; +import java.io.OutputStream; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; @@ -38,20 +46,20 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import org.apache.ratis.protocol.exceptions.GroupMismatchException; import org.apache.ratis.protocol.exceptions.RaftRetryFailureException; -import org.junit.*; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; import org.junit.rules.Timeout; -import java.io.IOException; -import java.io.OutputStream; -import java.time.Duration; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*; - /** * Tests failure detection by set flush delay and handling in * BlockOutputStream Class. @@ -89,10 +97,14 @@ public void init() throws Exception { flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; + + OzoneClientConfig config = new OzoneClientConfig(); + config.setChecksumType(ChecksumType.NONE); + conf.setFromObject(config); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 10, TimeUnit.SECONDS); conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 10, TimeUnit.SECONDS); - conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index 4bac4feb2e09..12ba4e6c1b05 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -17,18 +17,26 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; -import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneBucket; -import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; @@ -37,23 +45,16 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; - +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Rule; +import org.junit.Test; import org.junit.rules.Timeout; -import static java.nio.charset.StandardCharsets.UTF_8; /** * Tests Close Container Exception handling by Ozone Client. @@ -87,9 +88,13 @@ public class TestCloseContainerHandlingByClient { public static void init() throws Exception { chunkSize = (int) OzoneConsts.MB; blockSize = 4 * chunkSize; + + OzoneClientConfig config = new OzoneClientConfig(); + config.setChecksumType(ChecksumType.NONE); + conf.setFromObject(config); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); - conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java index abef9f2cfc57..a658cf449cee 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java @@ -30,8 +30,10 @@ import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientRatis; @@ -125,7 +127,10 @@ public void init() throws Exception { ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(10)); conf.setFromObject(ratisClientConfig); - conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); + OzoneClientConfig clientConfig = new OzoneClientConfig(); + clientConfig.setChecksumType(ChecksumType.NONE); + conf.setFromObject(clientConfig); + conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java index 6431b704a632..3dfddfefd878 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java @@ -17,11 +17,19 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.CertificateClientTestImpl; @@ -36,26 +44,19 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.test.GenericTestUtils; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.TimeUnit; - import org.junit.Rule; +import org.junit.Test; import org.junit.rules.Timeout; -import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; -import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY; /** * Tests the containerStateMachine failure handling. @@ -99,7 +100,11 @@ public void setup() throws Exception { conf.setQuietMode(false); OzoneManager.setTestSecureOmFlag(true); conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1); - conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false); + + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setStreamBufferFlushDelay(false); + conf.setFromObject(clientConfig); + // conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString()); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index 82836a3ef63d..34f6964be94d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -17,6 +17,19 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; @@ -25,6 +38,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; @@ -44,49 +58,28 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; -import org.apache.hadoop.ozone.container.common.transport.server.ratis. - ContainerStateMachine; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.test.LambdaTestUtils; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import org.apache.ratis.protocol.exceptions.StateMachineException; import org.apache.ratis.server.storage.FileInfo; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; - +import static org.hamcrest.core.Is.is; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.hadoop.hdds.HddsConfigKeys. - HDDS_COMMAND_STATUS_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.HddsConfigKeys. - HDDS_CONTAINER_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.HddsConfigKeys - .HDDS_PIPELINE_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. - ContainerDataProto.State.UNHEALTHY; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys. - OZONE_SCM_STALENODE_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys. - OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; -import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import org.junit.BeforeClass; +import org.junit.Test; /** * Tests the containerStateMachine failure handling. @@ -110,8 +103,11 @@ public class TestContainerStateMachineFailures { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); - conf.setBoolean(OzoneConfigKeys. - OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false); + + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setStreamBufferFlushDelay(false); + conf.setFromObject(clientConfig); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, TimeUnit.MILLISECONDS); conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDiscardPreallocatedBlocks.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDiscardPreallocatedBlocks.java index 37e13b6f952a..061c5e128e1c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDiscardPreallocatedBlocks.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDiscardPreallocatedBlocks.java @@ -17,10 +17,18 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -31,27 +39,22 @@ import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; -import org.apache.hadoop.ozone.client.io.*; +import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; - +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Rule; +import org.junit.Test; import org.junit.rules.Timeout; -import static java.nio.charset.StandardCharsets.UTF_8; /** * Tests Close Container Exception handling by Ozone Client. @@ -86,9 +89,13 @@ public class TestDiscardPreallocatedBlocks{ public static void init() throws Exception { chunkSize = (int) OzoneConsts.MB; blockSize = 4 * chunkSize; + + OzoneClientConfig config = new OzoneClientConfig(); + config.setChecksumType(ChecksumType.NONE); + conf.setFromObject(config); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); - conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java index a9c0706e04ac..b44427b18d28 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -122,8 +123,10 @@ private void init() throws Exception { raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(3)); conf.setFromObject(raftClientConfig); - conf.setBoolean( - OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setStreamBufferFlushDelay(false); + conf.setFromObject(clientConfig); + conf.setQuietMode(false); conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, StaticMapping.class, DNSToSwitchMapping.class); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java index 7775bb7def3c..4dbb0b60cf0e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java @@ -17,10 +17,18 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientMetrics; @@ -34,26 +42,19 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Test; import org.junit.Rule; +import org.junit.Test; import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; - /** * Tests {@link KeyInputStream}. */ @@ -96,14 +97,18 @@ public void init() throws Exception { flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; + + OzoneClientConfig config = new OzoneClientConfig(); + config.setBytesPerChecksum(256 * 1024 * 1024); + conf.setFromObject(config); + + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 64, StorageUnit.MB); conf.set(ScmConfigKeys.OZONE_SCM_CHUNK_LAYOUT_KEY, chunkLayout.name()); - conf.setStorageSize( - OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM, 256, StorageUnit.KB); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) .setTotalPipelineNumLimit(5) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java index dd871f339056..d885d38da748 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java @@ -16,22 +16,29 @@ */ package org.apache.hadoop.ozone.client.rpc; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers. - ContainerNotOpenException; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; @@ -40,24 +47,16 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; import org.apache.ratis.protocol.exceptions.GroupMismatchException; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - import org.junit.Rule; +import org.junit.Test; import org.junit.rules.Timeout; -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY; /** * Tests failure detection and handling in BlockOutputStream Class. @@ -96,12 +95,20 @@ public void init() throws Exception { flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; + + OzoneClientConfig config = new OzoneClientConfig(); + config.setMaxRetryCount(3); + config.setChecksumType(ChecksumType.NONE); + conf.setFromObject(config); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); - conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); - conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3); conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 3); conf.setQuietMode(false); - conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false); + + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setStreamBufferFlushDelay(false); + conf.setFromObject(clientConfig); + cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) .setTotalPipelineNumLimit(10) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java index e202ca18afd5..14bce991bdb6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java @@ -16,10 +16,17 @@ */ package org.apache.hadoop.ozone.client.rpc; +import java.io.IOException; +import java.io.OutputStream; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; @@ -29,7 +36,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; @@ -37,17 +43,16 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; -import org.apache.ratis.protocol.exceptions.GroupMismatchException; -import org.junit.*; -import org.junit.rules.Timeout; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.UUID; -import java.util.concurrent.TimeUnit; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import org.apache.ratis.protocol.exceptions.GroupMismatchException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; /** * Tests failure detection and handling in BlockOutputStream Class by set @@ -89,8 +94,12 @@ public void init() throws Exception { blockSize = 2 * maxFlushSize; conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); - conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); - conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3); + + OzoneClientConfig config = new OzoneClientConfig(); + config.setChecksumType(ChecksumType.NONE); + config.setMaxRetryCount(3); + conf.setFromObject(config); + conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 3); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 87c19ae75fdd..17cc0ce99424 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -18,12 +18,14 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.IOException; + import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; + import org.junit.AfterClass; import org.junit.BeforeClass; - -import java.io.IOException; import org.junit.Rule; import org.junit.rules.Timeout; @@ -49,6 +51,7 @@ public class TestOzoneRpcClient extends TestOzoneRpcClientAbstract { @BeforeClass public static void init() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); + conf.setFromObject(new OzoneClientConfig()); conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1); startCluster(conf); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index b7b75a4f8403..d04e97661bf7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdds.protocol.StorageType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -52,7 +53,6 @@ import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneAcl; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneTestUtils; import org.apache.hadoop.ozone.client.BucketArgs; @@ -106,7 +106,6 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.StringUtils; - import static org.apache.hadoop.hdds.StringUtils.string2Bytes; import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE; import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE; @@ -116,13 +115,12 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.ozone.OzoneConsts.GB; -import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PARTIAL_RENAME; import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.GROUP; import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.USER; import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ; - import org.junit.Assert; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -130,7 +128,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import org.junit.Test; /** @@ -1466,9 +1463,14 @@ private void createAndCorruptKey(String volumeName, String bucketName, private void readCorruptedKey(String volumeName, String bucketName, String keyName, boolean verifyChecksum) { try { + OzoneConfiguration configuration = cluster.getConf(); - configuration.setBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM, - verifyChecksum); + + final OzoneClientConfig clientConfig = + configuration.getObject(OzoneClientConfig.class); + clientConfig.setChecksumVerify(verifyChecksum); + configuration.setFromObject(clientConfig); + RpcClient client = new RpcClient(configuration, null); OzoneInputStream is = client.getKey(volumeName, bucketName, keyName); is.read(new byte[100]); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java index 10400b3ef988..ac84f172aedf 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; @@ -62,6 +63,7 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); + conf.setFromObject(new OzoneClientConfig()); conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1); conf.setBoolean(ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestValidateBCSIDOnRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestValidateBCSIDOnRestart.java index 84416d29ead9..483823c78d1d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestValidateBCSIDOnRestart.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestValidateBCSIDOnRestart.java @@ -17,6 +17,14 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; @@ -25,10 +33,10 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; @@ -45,6 +53,11 @@ import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import org.apache.ratis.server.storage.FileInfo; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.junit.AfterClass; @@ -52,25 +65,6 @@ import org.junit.BeforeClass; import org.junit.Test; -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.apache.hadoop.hdds.HddsConfigKeys. - HDDS_COMMAND_STATUS_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.HddsConfigKeys. - HDDS_CONTAINER_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.HddsConfigKeys - .HDDS_PIPELINE_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys. - OZONE_SCM_STALENODE_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys. - OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; - /** * Tests the containerStateMachine failure handling. */ @@ -92,8 +86,11 @@ public class TestValidateBCSIDOnRestart { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); - conf.setBoolean(OzoneConfigKeys. - OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false); + + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setStreamBufferFlushDelay(false); + conf.setFromObject(clientConfig); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, TimeUnit.MILLISECONDS); conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java index c92db45d769e..9058d3449bd5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java @@ -17,6 +17,17 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.IOException; +import java.io.OutputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; @@ -24,7 +35,11 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig; -import org.apache.hadoop.hdds.scm.*; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; +import org.apache.hadoop.hdds.scm.XceiverClientReply; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -32,7 +47,6 @@ import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; @@ -42,27 +56,16 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; import org.apache.hadoop.test.GenericTestUtils; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import org.apache.ratis.protocol.exceptions.GroupMismatchException; +import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.After; import org.junit.Test; -import java.io.IOException; -import java.io.OutputStream; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; - /** * This class verifies the watchForCommit Handling by xceiverClient. */ @@ -96,8 +99,11 @@ public void init() throws Exception { flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; - conf.setBoolean( - OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false); + + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setStreamBufferFlushDelay(false); + conf.setFromObject(clientConfig); + conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10, TimeUnit.SECONDS); conf.setQuietMode(false); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeBlockPutter.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeBlockPutter.java index 0bdb4a3171fd..620fd859fc8a 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeBlockPutter.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeBlockPutter.java @@ -19,12 +19,12 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.Callable; -import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto; @@ -34,9 +34,10 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.ozone.OzoneSecurityUtil; +import org.apache.hadoop.ozone.common.Checksum; import com.codahale.metrics.Timer; -import org.apache.hadoop.ozone.common.Checksum; +import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import picocli.CommandLine.Command; @@ -105,7 +106,7 @@ public Void call() throws Exception { byte[] data = RandomStringUtils.randomAscii(chunkSize) .getBytes(StandardCharsets.UTF_8); - Checksum checksum = new Checksum(); + Checksum checksum = new Checksum(ChecksumType.CRC32, 1024 * 1024); checksumProtobuf = checksum.computeChecksum(data).getProtoBufMessage(); runTests(this::putBlock);