diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 44af34cb919c..d1dcc654b100 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -223,6 +223,15 @@ public enum ChecksumCombineMode { tags = ConfigTag.CLIENT) private String fsDefaultBucketLayout = "FILE_SYSTEM_OPTIMIZED"; + @Config(key = "incremental.chunk.list", + defaultValue = "false", + type = ConfigType.BOOLEAN, + description = "Client PutBlock request can choose incremental chunk " + + "list rather than full chunk list to optimize performance. " + + "Critical to HBase.", + tags = ConfigTag.CLIENT) + private boolean incrementalChunkList = false; + @PostConstruct private void validate() { Preconditions.checkState(streamBufferSize > 0); @@ -404,4 +413,12 @@ public boolean isDatastreamPipelineMode() { public void setDatastreamPipelineMode(boolean datastreamPipelineMode) { this.datastreamPipelineMode = datastreamPipelineMode; } + + public void setIncrementalChunkList(boolean enable) { + this.incrementalChunkList = enable; + } + + public boolean getIncrementalChunkList() { + return this.incrementalChunkList; + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index a12f9067ce2d..78cef5f1b4b4 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -63,7 +63,7 @@ */ public class BlockInputStream extends BlockExtendedInputStream { - private static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(BlockInputStream.class); private final BlockID blockID; @@ -256,8 +256,8 @@ protected BlockData getBlockDataUsingClient() throws IOException { final Pipeline pipeline = xceiverClient.getPipeline(); if (LOG.isDebugEnabled()) { - LOG.debug("Initializing BlockInputStream for get key to access {}", - blockID.getContainerID()); + LOG.debug("Initializing BlockInputStream for get key to access block {}", + blockID); } DatanodeBlockID.Builder blkIDBuilder = diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index ac21411ea5a0..1308de4f45ab 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.storage; import java.io.IOException; import java.io.OutputStream; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -80,6 +82,12 @@ public class BlockOutputStream extends OutputStream { LoggerFactory.getLogger(BlockOutputStream.class); public static final String EXCEPTION_MSG = "Unexpected Storage Container Exception: "; + public static final String INCREMENTAL_CHUNK_LIST = "incremental"; + public static final KeyValue INCREMENTAL_CHUNK_LIST_KV = + KeyValue.newBuilder().setKey(INCREMENTAL_CHUNK_LIST).build(); + public static final String FULL_CHUNK = "full"; + public static final KeyValue FULL_CHUNK_KV = + KeyValue.newBuilder().setKey(FULL_CHUNK).build(); private AtomicReference blockID; private final AtomicReference previousChunkInfo @@ -123,6 +131,10 @@ public class BlockOutputStream extends OutputStream { private int currentBufferRemaining; //current buffer allocated to write private ChunkBuffer currentBuffer; + // last chunk holds the buffer after the last complete chunk, which may be + // different from currentBuffer. We need this to calculate checksum. + private ByteBuffer lastChunkBuffer; + private long lastChunkOffset; private final Token token; private final String tokenString; private int replicationIndex; @@ -164,6 +176,13 @@ public BlockOutputStream( } this.containerBlockData = BlockData.newBuilder().setBlockID( blkIDBuilder.build()).addMetadata(keyValue); + // tell DataNode I will send incremental chunk list + if (config.getIncrementalChunkList()) { + this.containerBlockData.addMetadata(INCREMENTAL_CHUNK_LIST_KV); + this.lastChunkBuffer = + ByteBuffer.allocate(config.getStreamBufferSize()); + this.lastChunkOffset = 0; + } this.xceiverClient = xceiverClientManager.acquireClient(pipeline); this.bufferPool = bufferPool; this.token = token; @@ -468,6 +487,14 @@ ContainerCommandResponseProto> executePutBlock(boolean close, ContainerCommandResponseProto> flushFuture = null; try { BlockData blockData = containerBlockData.build(); + LOG.debug("sending PutBlock {}", blockData); + + if (config.getIncrementalChunkList()) { + // remove any chunks in the containerBlockData list. + // since they are sent. + containerBlockData.clearChunks(); + } + XceiverClientReply asyncReply = putBlockAsync(xceiverClient, blockData, close, tokenString); CompletableFuture future = @@ -746,7 +773,12 @@ CompletableFuture writeChunkToContainer( setIoException(ce); throw ce; }); - containerBlockData.addChunks(chunkInfo); + if (config.getIncrementalChunkList()) { + updateBlockDataForWriteChunk(chunk); + } else { + containerBlockData.addChunks(chunkInfo); + } + clientMetrics.recordWriteChunk(pipeline, chunkInfo.getLen()); return validateFuture; } catch (IOException | ExecutionException e) { @@ -758,6 +790,156 @@ CompletableFuture writeChunkToContainer( return null; } + /** + * Update container block data, which is later sent to DataNodes via PutBlock, + * using the new chunks sent out via WriteChunk. + * + * This method is only used when incremental chunk list is enabled. + * @param chunk the chunk buffer to be sent out by WriteChunk. + * @throws OzoneChecksumException + */ + private void updateBlockDataForWriteChunk(ChunkBuffer chunk) + throws OzoneChecksumException { + // Update lastChunkBuffer using the new chunk data. + // This is used to calculate checksum for the last partial chunk in + // containerBlockData which will used by PutBlock. + + // the last partial chunk in containerBlockData will be replaced. + // So remove it. + removeLastPartialChunk(); + chunk.rewind(); + LOG.debug("Adding chunk pos {} limit {} remaining {}." + + "lastChunkBuffer pos {} limit {} remaining {} lastChunkOffset = {}", + chunk.position(), chunk.limit(), chunk.remaining(), + lastChunkBuffer.position(), lastChunkBuffer.limit(), + lastChunkBuffer.remaining(), lastChunkOffset); + + // Append the chunk to the last chunk buffer. + // if the resulting size exceeds limit (4MB), + // drop the full chunk and keep the rest. + if (lastChunkBuffer.position() + chunk.remaining() <= + lastChunkBuffer.capacity()) { + appendLastChunkBuffer(chunk, 0, chunk.remaining()); + } else { + int remainingBufferSize = + lastChunkBuffer.capacity() - lastChunkBuffer.position(); + appendLastChunkBuffer(chunk, 0, remainingBufferSize); + updateBlockDataWithLastChunkBuffer(); + appendLastChunkBuffer(chunk, remainingBufferSize, + chunk.remaining() - remainingBufferSize); + } + LOG.debug("after append, lastChunkBuffer={} lastChunkOffset={}", + lastChunkBuffer, lastChunkOffset); + + updateBlockDataWithLastChunkBuffer(); + } + + private void updateBlockDataWithLastChunkBuffer() + throws OzoneChecksumException { + // create chunk info for lastChunkBuffer + ChunkInfo lastChunkInfo = createChunkInfo(lastChunkOffset); + LOG.debug("lastChunkInfo = {}", lastChunkInfo); + long lastChunkSize = lastChunkInfo.getLen(); + addToBlockData(lastChunkInfo); + + lastChunkBuffer.clear(); + if (lastChunkSize == config.getStreamBufferSize()) { + lastChunkOffset += config.getStreamBufferSize(); + } else { + lastChunkBuffer.position((int) lastChunkSize); + } + } + + private void appendLastChunkBuffer(ChunkBuffer chunkBuffer, int offset, + int length) { + LOG.debug("copying to last chunk buffer offset={} length={}", + offset, length); + int pos = 0; + int uncopied = length; + for (ByteBuffer bb : chunkBuffer.asByteBufferList()) { + if (pos + bb.remaining() >= offset) { + int copyStart = offset < pos ? 0 : offset - pos; + int copyLen = Math.min(uncopied, bb.remaining()); + try { + LOG.debug("put into last chunk buffer start = {} len = {}", + copyStart, copyLen); + lastChunkBuffer.put(bb.array(), copyStart, copyLen); + } catch (BufferOverflowException e) { + LOG.error("appending from " + copyStart + " for len=" + copyLen + + ". lastChunkBuffer remaining=" + lastChunkBuffer.remaining() + + " pos=" + lastChunkBuffer.position() + + " limit=" + lastChunkBuffer.limit() + + " capacity=" + lastChunkBuffer.capacity()); + throw e; + } + + uncopied -= copyLen; + } + + pos += bb.remaining(); + if (pos >= offset + length) { + return; + } + if (uncopied == 0) { + return; + } + } + } + + private void removeLastPartialChunk() { + // remove the last chunk if it's partial. + if (containerBlockData.getChunksList().isEmpty()) { + return; + } + int lastChunkIndex = containerBlockData.getChunksCount() - 1; + ChunkInfo lastChunkInBlockData = containerBlockData.getChunks( + lastChunkIndex); + if (!isFullChunk(lastChunkInBlockData)) { + containerBlockData.removeChunks(lastChunkIndex); + } + } + + private ChunkInfo createChunkInfo(long lastPartialChunkOffset) + throws OzoneChecksumException { + lastChunkBuffer.flip(); + int revisedChunkSize = lastChunkBuffer.remaining(); + // create the chunk info to be sent in PutBlock. + ChecksumData revisedChecksumData = + checksum.computeChecksum(lastChunkBuffer); + + long chunkID = lastPartialChunkOffset / config.getStreamBufferSize(); + ChunkInfo.Builder revisedChunkInfo = ChunkInfo.newBuilder() + .setChunkName(blockID.get().getLocalID() + "_chunk_" + chunkID) + .setOffset(lastPartialChunkOffset) + .setLen(revisedChunkSize) + .setChecksumData(revisedChecksumData.getProtoBufMessage()); + // if full chunk + if (revisedChunkSize == config.getStreamBufferSize()) { + revisedChunkInfo.addMetadata(FULL_CHUNK_KV); + } + return revisedChunkInfo.build(); + } + + private boolean isFullChunk(ChunkInfo chunkInfo) { + Preconditions.checkState( + chunkInfo.getLen() <= config.getStreamBufferSize()); + return chunkInfo.getLen() == config.getStreamBufferSize(); + } + + private void addToBlockData(ChunkInfo revisedChunkInfo) { + LOG.debug("containerBlockData chunk: {}", containerBlockData); + if (containerBlockData.getChunksCount() > 0) { + ChunkInfo lastChunk = containerBlockData.getChunks( + containerBlockData.getChunksCount() - 1); + LOG.debug("revisedChunkInfo chunk: {}", revisedChunkInfo); + Preconditions.checkState(lastChunk.getOffset() + lastChunk.getLen() == + revisedChunkInfo.getOffset(), + "lastChunk.getOffset() + lastChunk.getLen() " + + "!= revisedChunkInfo.getOffset()"); + } + containerBlockData.addChunks(revisedChunkInfo); + } + @VisibleForTesting public void setXceiverClient(XceiverClientSpi xceiverClient) { this.xceiverClient = xceiverClient; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java index 51e453350083..84000ba2fb96 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreWithIncrementalChunkList.java @@ -94,12 +94,16 @@ private void reconcilePartialChunks( LOG.debug("blockData={}, lastChunk={}", blockData.getChunks(), lastChunk.getChunks()); Preconditions.checkState(lastChunk.getChunks().size() == 1); - ContainerProtos.ChunkInfo lastChunkInBlockData = - blockData.getChunks().get(blockData.getChunks().size() - 1); - Preconditions.checkState( - lastChunkInBlockData.getOffset() + lastChunkInBlockData.getLen() - == lastChunk.getChunks().get(0).getOffset(), - "chunk offset does not match"); + if (!blockData.getChunks().isEmpty()) { + ContainerProtos.ChunkInfo lastChunkInBlockData = + blockData.getChunks().get(blockData.getChunks().size() - 1); + if (lastChunkInBlockData != null) { + Preconditions.checkState( + lastChunkInBlockData.getOffset() + lastChunkInBlockData.getLen() + == lastChunk.getChunks().get(0).getOffset(), + "chunk offset does not match"); + } + } // append last partial chunk to the block data List chunkInfos = @@ -136,7 +140,7 @@ private static boolean shouldAppendLastChunk(boolean endOfBlock, public void putBlockByID(BatchOperation batch, boolean incremental, long localID, BlockData data, KeyValueContainerData containerData, boolean endOfBlock) throws IOException { - if (!incremental && !isPartialChunkList(data)) { + if (!incremental || !isPartialChunkList(data)) { // Case (1) old client: override chunk list. getBlockDataTable().putWithBatch( batch, containerData.getBlockKey(localID), data); @@ -151,14 +155,21 @@ public void putBlockByID(BatchOperation batch, boolean incremental, private void moveLastChunkToBlockData(BatchOperation batch, long localID, BlockData data, KeyValueContainerData containerData) throws IOException { + // if data has no chunks, fetch the last chunk info from lastChunkInfoTable + if (data.getChunks().isEmpty()) { + BlockData lastChunk = getLastChunkInfoTable().get(containerData.getBlockKey(localID)); + if (lastChunk != null) { + reconcilePartialChunks(lastChunk, data); + } + } // if eob or if the last chunk is full, // the 'data' is full so append it to the block table's chunk info // and then remove from lastChunkInfo BlockData blockData = getBlockDataTable().get( containerData.getBlockKey(localID)); if (blockData == null) { - // Case 2.1 if the block did not have full chunks before, - // the block's chunk is what received from client this time. + // Case 2.1 if the block did not have full chunks before + // the block's chunk is what received from client this time, plus the chunks in lastChunkInfoTable blockData = data; } else { // case 2.2 the block already has some full chunks diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java index 38a01e46900d..26d959e88600 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestBlockManagerImpl.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.UUID; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_CHUNK_LIST_INCREMENTAL; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion; @@ -83,6 +84,7 @@ private void initTest(ContainerTestVersionInfo versionInfo) this.schemaVersion = versionInfo.getSchemaVersion(); this.config = new OzoneConfiguration(); ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, config); + config.setBoolean(OZONE_CHUNK_LIST_INCREMENTAL, true); initilaze(); } diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java index e4a8a80a6318..31f5e20bc882 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java @@ -56,6 +56,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; @@ -67,6 +69,8 @@ * OM transport for testing with in-memory state. */ public class MockOmTransport implements OmTransport { + private static final Logger LOG = + LoggerFactory.getLogger(MockOmTransport.class); private final MockBlockAllocator blockAllocator; //volumename -> volumeinfo @@ -185,11 +189,44 @@ private GetKeyInfoResponse getKeyInfo(GetKeyInfoRequest request) { .build(); } + private boolean isHSync(CommitKeyRequest commitKeyRequest) { + return commitKeyRequest.hasHsync() && commitKeyRequest.getHsync(); + } + + private boolean isRecovery(CommitKeyRequest commitKeyRequest) { + return commitKeyRequest.hasRecovery() && commitKeyRequest.getRecovery(); + } + + private String toOperationString(CommitKeyRequest commitKeyRequest) { + boolean hsync = isHSync(commitKeyRequest); + boolean recovery = isRecovery(commitKeyRequest); + if (hsync) { + return "hsync"; + } + if (recovery) { + return "recover"; + } + return "commit"; + } + + private CommitKeyResponse commitKey(CommitKeyRequest commitKeyRequest) { final KeyArgs keyArgs = commitKeyRequest.getKeyArgs(); final KeyInfo openKey = openKeys.get(keyArgs.getVolumeName()).get(keyArgs.getBucketName()) - .remove(keyArgs.getKeyName()); + .get(keyArgs.getKeyName()); + LOG.debug("{} open key vol: {} bucket: {} key: {}", + toOperationString(commitKeyRequest), + keyArgs.getVolumeName(), + keyArgs.getBucketName(), + keyArgs.getKeyName()); + boolean hsync = isHSync(commitKeyRequest); + if (!hsync) { + KeyInfo deleteKey = openKeys.get(keyArgs.getVolumeName()) + .get(keyArgs.getBucketName()) + .remove(keyArgs.getKeyName()); + assert deleteKey != null; + } final KeyInfo.Builder committedKeyInfoWithLocations = KeyInfo.newBuilder().setVolumeName(keyArgs.getVolumeName()) .setBucketName(keyArgs.getBucketName()) diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java new file mode 100644 index 000000000000..68549e4104b4 --- /dev/null +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestBlockOutputStreamIncrementalPutBlock.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.client; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.UUID; + +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx; +import org.jetbrains.annotations.NotNull; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.InMemoryConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.client.rpc.RpcClient; +import org.apache.hadoop.ozone.om.protocolPB.OmTransport; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_CHUNK_LIST_INCREMENTAL; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + + +/** + * Verify BlockOutputStream with incremental PutBlock feature. + * (ozone.client.incremental.chunk.list = true) + */ +public class TestBlockOutputStreamIncrementalPutBlock { + private OzoneClient client; + private final String keyName = UUID.randomUUID().toString(); + private final String volumeName = UUID.randomUUID().toString(); + private final String bucketName = UUID.randomUUID().toString(); + private OzoneBucket bucket; + private final ConfigurationSource config = new InMemoryConfiguration(); + + public static Iterable parameters() { + return Arrays.asList(true, false); + } + + private void init(boolean incrementalChunkList) throws IOException { + OzoneClientConfig clientConfig = config.getObject(OzoneClientConfig.class); + + clientConfig.setIncrementalChunkList(incrementalChunkList); + clientConfig.setChecksumType(ContainerProtos.ChecksumType.CRC32C); + + ((InMemoryConfiguration)config).setFromObject(clientConfig); + + ((InMemoryConfiguration) config).setBoolean( + OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true); + ((InMemoryConfiguration) config).setBoolean( + OZONE_CHUNK_LIST_INCREMENTAL, incrementalChunkList); + + RpcClient rpcClient = new RpcClient(config, null) { + + @Override + protected OmTransport createOmTransport( + String omServiceId) + throws IOException { + return new MockOmTransport(); + } + + @NotNull + @Override + protected XceiverClientFactory createXceiverClientFactory( + ServiceInfoEx serviceInfo) throws IOException { + return new MockXceiverClientFactory(); + } + }; + + client = new OzoneClient(config, rpcClient); + ObjectStore store = client.getObjectStore(); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + bucket = volume.getBucket(bucketName); + } + + @AfterEach + public void close() throws IOException { + client.close(); + } + + @ParameterizedTest + @MethodSource("parameters") + public void writeSmallChunk(boolean incrementalChunkList) + throws IOException { + init(incrementalChunkList); + + int size = 1024; + String s = RandomStringUtils.randomAlphabetic(1024); + ByteBuffer byteBuffer = ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)); + + try (OzoneOutputStream out = bucket.createKey(keyName, size, + ReplicationConfig.getDefault(config), new HashMap<>())) { + for (int i = 0; i < 4097; i++) { + out.write(byteBuffer); + out.hsync(); + } + } + + try (OzoneInputStream is = bucket.readKey(keyName)) { + ByteBuffer readBuffer = ByteBuffer.allocate(size); + for (int i = 0; i < 4097; i++) { + is.read(readBuffer); + assertArrayEquals(readBuffer.array(), byteBuffer.array()); + } + } + } + + @ParameterizedTest + @MethodSource("parameters") + public void writeLargeChunk(boolean incrementalChunkList) + throws IOException { + init(incrementalChunkList); + + int size = 1024 * 1024 + 1; + ByteBuffer byteBuffer = ByteBuffer.allocate(size); + + try (OzoneOutputStream out = bucket.createKey(keyName, size, + ReplicationConfig.getDefault(config), new HashMap<>())) { + for (int i = 0; i < 4; i++) { + out.write(byteBuffer); + out.hsync(); + } + } + + try (OzoneInputStream is = bucket.readKey(keyName)) { + ByteBuffer readBuffer = ByteBuffer.allocate(size); + for (int i = 0; i < 4; i++) { + is.read(readBuffer); + assertArrayEquals(readBuffer.array(), byteBuffer.array()); + } + } + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index 8d7439604e8c..cd318e88bafe 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -21,6 +21,8 @@ import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; import java.util.HashMap; @@ -28,12 +30,19 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; + +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.storage.BlockInputStream; +import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.crypto.CipherSuite; @@ -58,8 +67,10 @@ import org.apache.hadoop.ozone.client.BucketArgs; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneKeyDetails; import org.apache.hadoop.ozone.client.io.ECKeyOutputStream; import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OMMetrics; @@ -82,11 +93,16 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_CHUNK_LIST_INCREMENTAL; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT; @@ -96,11 +112,13 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -120,12 +138,13 @@ public class TestHSync { private static OzoneClient client; private static final BucketLayout BUCKET_LAYOUT = BucketLayout.FILE_SYSTEM_OPTIMIZED; + private static final int CHUNK_SIZE = 4 << 12; + private static final int FLUSH_SIZE = 2 * CHUNK_SIZE; + private static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE; + private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE; + @BeforeAll public static void init() throws Exception { - final int chunkSize = 4 << 10; - final int flushSize = 2 * chunkSize; - final int maxFlushSize = 2 * flushSize; - final int blockSize = 2 * maxFlushSize; final BucketLayout layout = BUCKET_LAYOUT; CONF.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, false); @@ -133,17 +152,19 @@ public static void init() throws Exception { CONF.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true); // Reduce KeyDeletingService interval CONF.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS); + CONF.setBoolean("ozone.client.incremental.chunk.list", true); + CONF.setBoolean(OZONE_CHUNK_LIST_INCREMENTAL, true); cluster = MiniOzoneCluster.newBuilder(CONF) .setNumDatanodes(5) .setTotalPipelineNumLimit(10) - .setBlockSize(blockSize) - .setChunkSize(chunkSize) - .setStreamBufferFlushSize(flushSize) - .setStreamBufferMaxSize(maxFlushSize) - .setDataStreamBufferFlushize(maxFlushSize) + .setBlockSize(BLOCK_SIZE) + .setChunkSize(CHUNK_SIZE) + .setStreamBufferFlushSize(FLUSH_SIZE) + .setStreamBufferMaxSize(MAX_FLUSH_SIZE) + .setDataStreamBufferFlushize(MAX_FLUSH_SIZE) .setStreamBufferSizeUnit(StorageUnit.BYTES) - .setDataStreamMinPacketSize(chunkSize) - .setDataStreamStreamWindowSize(5 * chunkSize) + .setDataStreamMinPacketSize(CHUNK_SIZE) + .setDataStreamStreamWindowSize(5 * CHUNK_SIZE) .build(); cluster.waitForClusterToBeReady(); client = cluster.newClient(); @@ -155,6 +176,8 @@ public static void init() throws Exception { GenericTestUtils.setLogLevel(OMKeyRequest.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(OMKeyCommitRequest.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(OMKeyCommitRequestWithFSO.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(BlockOutputStream.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(BlockInputStream.LOG, Level.DEBUG); } @AfterAll @@ -287,13 +310,15 @@ public void testKeyHSyncThenClose() throws Exception { } } - @Test - public void testO3fsHSync() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testO3fsHSync(boolean incrementalChunkList) throws Exception { // Set the fs.defaultFS final String rootPath = String.format("%s://%s.%s/", OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName()); CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + initClientConfig(incrementalChunkList); try (FileSystem fs = FileSystem.get(CONF)) { for (int i = 0; i < 10; i++) { final Path file = new Path("/file" + i); @@ -302,8 +327,10 @@ public void testO3fsHSync() throws Exception { } } - @Test - public void testOfsHSync() throws Exception { + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testOfsHSync(boolean incrementalChunkList) throws Exception { // Set the fs.defaultFS final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY)); @@ -312,6 +339,7 @@ public void testOfsHSync() throws Exception { final String dir = OZONE_ROOT + bucket.getVolumeName() + OZONE_URI_DELIMITER + bucket.getName(); + initClientConfig(incrementalChunkList); try (FileSystem fs = FileSystem.get(CONF)) { for (int i = 0; i < 10; i++) { final Path file = new Path(dir, "file" + i); @@ -429,13 +457,11 @@ public void testHsyncKeyCallCount() throws Exception { ThreadLocalRandom.current().nextBytes(data); final Path file = new Path(dir, "file-hsync-then-close"); - long blockSize; try (FileSystem fs = FileSystem.get(CONF)) { - blockSize = fs.getDefaultBlockSize(file); long fileSize = 0; try (FSDataOutputStream outputStream = fs.create(file, true)) { // make sure at least writing 2 blocks data - while (fileSize <= blockSize) { + while (fileSize <= BLOCK_SIZE) { outputStream.write(data, 0, data.length); outputStream.hsync(); fileSize += data.length; @@ -448,9 +474,9 @@ public void testHsyncKeyCallCount() throws Exception { omMetrics.resetNumKeyHSyncs(); long writtenSize = 0; try (OzoneOutputStream outputStream = bucket.createKey("key-" + RandomStringUtils.randomNumeric(5), - blockSize * 2, ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>())) { + BLOCK_SIZE * 2, ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>())) { // make sure at least writing 2 blocks data - while (writtenSize <= blockSize) { + while (writtenSize <= BLOCK_SIZE) { outputStream.write(data, 0, data.length); outputStream.hsync(); writtenSize += data.length; @@ -733,4 +759,117 @@ private void testEncryptedStreamCapabilities(boolean isEC) throws IOException, assertFalse(cofsos.hasCapability(StreamCapabilities.HFLUSH)); } } + + public void initClientConfig(boolean incrementalChunkList) { + OzoneClientConfig clientConfig = CONF.getObject(OzoneClientConfig.class); + clientConfig.setIncrementalChunkList(incrementalChunkList); + clientConfig.setChecksumType(ContainerProtos.ChecksumType.CRC32C); + CONF.setFromObject(clientConfig); + } + + public static Stream parameters1() { + return Stream.of( + arguments(true, 512), + arguments(true, 511), + arguments(true, 513), + arguments(false, 512), + arguments(false, 511), + arguments(false, 513) + ); + } + + @ParameterizedTest + @MethodSource("parameters1") + public void writeWithSmallBuffer(boolean incrementalChunkList, int bufferSize) + throws IOException { + initClientConfig(incrementalChunkList); + + final String keyName = UUID.randomUUID().toString(); + int fileSize = 16 << 11; + String s = RandomStringUtils.randomAlphabetic(bufferSize); + ByteBuffer byteBuffer = ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)); + + int writtenSize = 0; + try (OzoneOutputStream out = bucket.createKey(keyName, fileSize, + ReplicationConfig.getDefault(CONF), new HashMap<>())) { + while (writtenSize < fileSize) { + int len = Math.min(bufferSize, fileSize - writtenSize); + out.write(byteBuffer, 0, len); + out.hsync(); + writtenSize += bufferSize; + } + } + + OzoneKeyDetails keyInfo = bucket.getKey(keyName); + assertEquals(fileSize, keyInfo.getDataSize()); + + int readSize = 0; + try (OzoneInputStream is = bucket.readKey(keyName)) { + while (readSize < fileSize) { + int len = Math.min(bufferSize, fileSize - readSize); + ByteBuffer readBuffer = ByteBuffer.allocate(len); + int readLen = is.read(readBuffer); + assertEquals(len, readLen); + if (len < bufferSize) { + for (int i = 0; i < len; i++) { + assertEquals(readBuffer.array()[i], byteBuffer.array()[i]); + } + } else { + assertArrayEquals(readBuffer.array(), byteBuffer.array()); + } + readSize += readLen; + } + } + bucket.deleteKey(keyName); + } + + public static Stream parameters2() { + return Stream.of( + arguments(true, 1024 * 1024 + 1), + arguments(true, 1024 * 1024 + 1 + CHUNK_SIZE), + arguments(true, 1024 * 1024 - 1 + CHUNK_SIZE), + arguments(false, 1024 * 1024 + 1), + arguments(false, 1024 * 1024 + 1 + CHUNK_SIZE), + arguments(false, 1024 * 1024 - 1 + CHUNK_SIZE) + ); + } + + @ParameterizedTest + @MethodSource("parameters2") + public void writeWithBigBuffer(boolean incrementalChunkList, int bufferSize) + throws IOException { + initClientConfig(incrementalChunkList); + + final String keyName = UUID.randomUUID().toString(); + int count = 2; + int fileSize = bufferSize * count; + ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize); + + try (OzoneOutputStream out = bucket.createKey(keyName, fileSize, + ReplicationConfig.getDefault(CONF), new HashMap<>())) { + for (int i = 0; i < count; i++) { + out.write(byteBuffer); + out.hsync(); + } + } + + OzoneKeyDetails keyInfo = bucket.getKey(keyName); + assertEquals(fileSize, keyInfo.getDataSize()); + int totalReadLen = 0; + try (OzoneInputStream is = bucket.readKey(keyName)) { + + for (int i = 0; i < count; i++) { + ByteBuffer readBuffer = ByteBuffer.allocate(bufferSize); + int readLen = is.read(readBuffer); + if (bufferSize != readLen) { + throw new IOException("failed to read " + bufferSize + " from offset " + totalReadLen + + ", actually read " + readLen + ", block " + totalReadLen / + BLOCK_SIZE); + } + assertArrayEquals(byteBuffer.array(), readBuffer.array()); + totalReadLen += readLen; + } + } + bucket.deleteKey(keyName); + } }