From a66fe3b3d45701919373e751b1dc5e893014daf1 Mon Sep 17 00:00:00 2001 From: "pony.chen" Date: Tue, 31 Oct 2023 20:48:50 +0800 Subject: [PATCH 01/17] HDDS-9526. Two S3G instances writing the same key may cause data loss in case of an exception. --- .../io/BlockDataStreamOutputEntryPool.java | 4 ++ .../client/io/BlockOutputStreamEntryPool.java | 4 ++ .../ozone/client/io/KeyDataStreamOutput.java | 27 ++++++- .../ozone/client/io/KeyOutputStream.java | 27 ++++++- .../ozone/client/protocol/ClientProtocol.java | 3 + .../hadoop/ozone/client/rpc/RpcClient.java | 20 +++++- .../hadoop/ozone/client/TestOzoneClient.java | 34 +++++++++ .../ozone/s3/endpoint/EndpointBase.java | 7 +- .../ozone/s3/endpoint/ObjectEndpoint.java | 19 +++++ .../ozone/client/ClientProtocolStub.java | 5 ++ .../hadoop/ozone/client/OzoneBucketStub.java | 2 +- .../endpoint/TestMultipartUploadWithCopy.java | 23 ++++++ .../ozone/s3/endpoint/TestObjectPut.java | 44 ++++++++++++ .../ozone/s3/endpoint/TestPartUpload.java | 71 ++++++++++++++++++- 14 files changed, 279 insertions(+), 11 deletions(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index e51242cc107b..4a89be582e00 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -287,4 +287,8 @@ long computeBufferData() { } return totalDataLen; } + + public long getDataSize() { + return keyArgs.getDataSize(); + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index 8a1796c3f0a9..c3191aac73df 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -428,4 +428,8 @@ public ExcludeList getExcludeList() { boolean isEmpty() { return streamEntries.isEmpty(); } + + long getDataSize() { + return keyArgs.getDataSize(); + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index edc76066b55a..5a731db81ed8 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -79,6 +79,14 @@ enum StreamAction { private long clientID; + /** + * Indicates if an atomic write is required. When set to true, + * the amount of data written must match the declared size during the commit. + * A mismatch will prevent the commit from succeeding. + * This is essential for operations like S3 put to ensure atomicity. + */ + private boolean requiresAtomicWrite; + @VisibleForTesting public List getStreamEntries() { return blockDataStreamOutputEntryPool.getStreamEntries(); @@ -107,7 +115,8 @@ public KeyDataStreamOutput( OzoneManagerProtocol omClient, int chunkSize, String requestId, ReplicationConfig replicationConfig, String uploadID, int partNumber, boolean isMultipart, - boolean unsafeByteBufferConversion + boolean unsafeByteBufferConversion, + boolean requiresAtomicWrite ) { super(HddsClientUtils.getRetryPolicyByException( config.getMaxRetryCount(), config.getRetryInterval())); @@ -128,6 +137,7 @@ public KeyDataStreamOutput( // encrypted bucket. this.writeOffset = 0; this.clientID = handler.getId(); + this.requiresAtomicWrite = requiresAtomicWrite; } /** @@ -385,6 +395,12 @@ public void close() throws IOException { if (!isException()) { Preconditions.checkArgument(writeOffset == offset); } + if (requiresAtomicWrite) { + long expectedSize = blockDataStreamOutputEntryPool.getDataSize(); + Preconditions.checkArgument(expectedSize == offset, + String.format("Expected: %d and actual %d write sizes do not match", + expectedSize, offset)); + } blockDataStreamOutputEntryPool.commitKey(offset); } finally { blockDataStreamOutputEntryPool.cleanup(); @@ -415,6 +431,7 @@ public static class Builder { private boolean unsafeByteBufferConversion; private OzoneClientConfig clientConfig; private ReplicationConfig replicationConfig; + private boolean requiresAtomicWrite = false; public Builder setMultipartUploadID(String uploadID) { this.multipartUploadID = uploadID; @@ -467,6 +484,11 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) { return this; } + public Builder setRequiresAtomicWrite(boolean atomicWrite) { + this.requiresAtomicWrite = atomicWrite; + return this; + } + public KeyDataStreamOutput build() { return new KeyDataStreamOutput( clientConfig, @@ -479,7 +501,8 @@ public KeyDataStreamOutput build() { multipartUploadID, multipartNumber, isMultipartKey, - unsafeByteBufferConversion); + unsafeByteBufferConversion, + requiresAtomicWrite); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 2eea67656151..96804dcd541b 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -94,6 +94,14 @@ enum StreamAction { private long clientID; + /** + * Indicates if an atomic write is required. When set to true, + * the amount of data written must match the declared size during the commit. + * A mismatch will prevent the commit from succeeding. + * This is essential for operations like S3 put to ensure atomicity. + */ + private boolean requiresAtomicWrite; + public KeyOutputStream(ReplicationConfig replicationConfig, ContainerClientMetrics clientMetrics) { this.replication = replicationConfig; @@ -141,7 +149,8 @@ public KeyOutputStream( String requestId, ReplicationConfig replicationConfig, String uploadID, int partNumber, boolean isMultipart, boolean unsafeByteBufferConversion, - ContainerClientMetrics clientMetrics + ContainerClientMetrics clientMetrics, + boolean requiresAtomicWrite ) { this.config = config; this.replication = replicationConfig; @@ -162,6 +171,7 @@ public KeyOutputStream( this.isException = false; this.writeOffset = 0; this.clientID = handler.getId(); + this.requiresAtomicWrite = requiresAtomicWrite; } /** @@ -554,6 +564,12 @@ public synchronized void close() throws IOException { if (!isException) { Preconditions.checkArgument(writeOffset == offset); } + if (requiresAtomicWrite) { + long expectedSize = blockOutputStreamEntryPool.getDataSize(); + Preconditions.checkArgument(expectedSize == offset, + String.format("Expected: %d and actual %d write sizes do not match", + expectedSize, offset)); + } blockOutputStreamEntryPool.commitKey(offset); } finally { blockOutputStreamEntryPool.cleanup(); @@ -585,6 +601,7 @@ public static class Builder { private OzoneClientConfig clientConfig; private ReplicationConfig replicationConfig; private ContainerClientMetrics clientMetrics; + private boolean requiresAtomicWrite = false; public String getMultipartUploadID() { return multipartUploadID; @@ -671,6 +688,11 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) { return this; } + public Builder setRequiresAtomicWrite(boolean atomicWrite) { + this.requiresAtomicWrite = atomicWrite; + return this; + } + public Builder setClientMetrics(ContainerClientMetrics clientMetrics) { this.clientMetrics = clientMetrics; return this; @@ -692,7 +714,8 @@ public KeyOutputStream build() { multipartNumber, isMultipartKey, unsafeByteBufferConversion, - clientMetrics); + clientMetrics, + requiresAtomicWrite); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java index b45a3209f418..bc01d6653ba9 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java @@ -1008,6 +1008,9 @@ OzoneKey headObject(String volumeName, String bucketName, */ void setThreadLocalS3Auth(S3Auth s3Auth); + + void setIsS3Request(boolean isS3Request); + /** * Gets the S3 Authentication information that is attached to the thread. * @return S3 Authentication information. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index fc91335771e1..0d317503d18d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -42,6 +42,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -214,6 +215,7 @@ public class RpcClient implements ClientProtocol { private final OzoneManagerVersion omVersion; private volatile ExecutorService ecReconstructExecutor; private final ContainerClientMetrics clientMetrics; + private final AtomicBoolean isS3GRequest = new AtomicBoolean(false); /** * Creates RpcClient instance with the given configuration. @@ -689,7 +691,7 @@ public void createBucket( : "with server-side default bucket layout"; LOG.info("Creating Bucket: {}/{}, {}, {} as owner, Versioning {}, " + "Storage Type set to {} and Encryption set to {}, " + - "Replication Type set to {}, Namespace Quota set to {}, " + + "Replication Type set to {}, Namespace Quota set to {}, " + "Space Quota set to {} ", volumeName, bucketName, layoutMsg, owner, isVersionEnabled, storageType, bek != null, replicationType, @@ -1789,6 +1791,7 @@ public OzoneOutputStream createMultipartKey( .setMultipartNumber(partNumber) .setMultipartUploadID(uploadID) .setIsMultipartKey(true) + .setRequiresAtomicWrite(isS3GRequest.get()) .build(); return createOutputStream(openKey, keyOutputStream); } @@ -1804,7 +1807,9 @@ public OzoneDataStreamOutput createMultipartStreamKey( throws IOException { final OpenKeySession openKey = newMultipartOpenKey( volumeName, bucketName, keyName, size, partNumber, uploadID); - + // Amazon S3 never adds partial objects, So for S3 requests we need to + // set RequiresAtomicWrite to true + // refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html KeyDataStreamOutput keyOutputStream = new KeyDataStreamOutput.Builder() .setHandler(openKey) @@ -1816,6 +1821,7 @@ public OzoneDataStreamOutput createMultipartStreamKey( .setIsMultipartKey(true) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .setConfig(conf.getObject(OzoneClientConfig.class)) + .setRequiresAtomicWrite(isS3GRequest.get()) .build(); keyOutputStream .addPreallocateBlocks( @@ -2218,6 +2224,9 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey) throws IOException { final ReplicationConfig replicationConfig = openKey.getKeyInfo().getReplicationConfig(); + // Amazon S3 never adds partial objects, So for S3 requests we need to + // set RequiresAtomicWrite to true + // refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html KeyDataStreamOutput keyOutputStream = new KeyDataStreamOutput.Builder() .setHandler(openKey) @@ -2226,6 +2235,7 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey) .setReplicationConfig(replicationConfig) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .setConfig(conf.getObject(OzoneClientConfig.class)) + .setRequiresAtomicWrite(isS3GRequest.get()) .build(); keyOutputStream .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(), @@ -2307,6 +2317,7 @@ private KeyOutputStream.Builder createKeyOutputStream( .setOmClient(ozoneManagerClient) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .setConfig(conf.getObject(OzoneClientConfig.class)) + .setRequiresAtomicWrite(isS3GRequest.get()) .setClientMetrics(clientMetrics); } @@ -2385,6 +2396,11 @@ public void setThreadLocalS3Auth( ozoneManagerClient.setThreadLocalS3Auth(ozoneSharedSecretAuth); } + @Override + public void setIsS3Request(boolean s3Request) { + this.isS3GRequest.set(s3Request); + } + @Override public S3Auth getThreadLocalS3Auth() { return ozoneManagerClient.getThreadLocalS3Auth(); diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java index 542b8a8a9eca..d16d4a76402a 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java @@ -224,6 +224,40 @@ public void testPutKeyWithECReplicationConfig() throws IOException { } } + /** + * This test validates that for S3G, + * the key upload process needs to be atomic. + * It simulates two mismatch scenarios where the actual write data size does + * not match the expected size. + */ + @Test + public void testPutKeySizeMismatch() throws IOException { + String value = new String(new byte[1024], UTF_8); + OzoneBucket bucket = getOzoneBucket(); + String keyName = UUID.randomUUID().toString(); + try { + // Simulating first mismatch: Write less data than expected + client.getProxy().setIsS3Request(true); + OzoneOutputStream out1 = bucket.createKey(keyName, + value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE, + new HashMap<>()); + out1.write(value.substring(0, value.length() - 1).getBytes(UTF_8)); + Assertions.assertThrows(IllegalArgumentException.class, out1::close, + "Expected IllegalArgumentException due to size mismatch."); + + // Simulating second mismatch: Write more data than expected + OzoneOutputStream out2 = bucket.createKey(keyName, + value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE, + new HashMap<>()); + value += "1"; + out2.write(value.getBytes(UTF_8)); + Assertions.assertThrows(IllegalArgumentException.class, out2::close, + "Expected IllegalArgumentException due to size mismatch."); + } finally { + client.getProxy().setIsS3Request(false); + } + } + private OzoneBucket getOzoneBucket() throws IOException { String volumeName = UUID.randomUUID().toString(); String bucketName = UUID.randomUUID().toString(); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java index 97027abd7c81..846910c7bd08 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java @@ -123,9 +123,10 @@ public void initialization() { signatureInfo.getSignature(), signatureInfo.getAwsAccessId(), signatureInfo.getAwsAccessId()); LOG.debug("S3 access id: {}", s3Auth.getAccessID()); - getClient().getObjectStore() - .getClientProxy() - .setThreadLocalS3Auth(s3Auth); + ClientProtocol clientProtocol = + getClient().getObjectStore().getClientProxy(); + clientProtocol.setThreadLocalS3Auth(s3Auth); + clientProtocol.setIsS3Request(true); init(); } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 5ef67f3c6f48..f843515312a6 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -117,6 +117,7 @@ import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.PRECOND_FAILED; import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError; import static org.apache.hadoop.ozone.s3.util.S3Consts.ACCEPT_RANGE_HEADER; +import static org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.CONTENT_RANGE_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER_RANGE; @@ -266,6 +267,8 @@ public Response put( if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD" .equals(headers.getHeaderString("x-amz-content-sha256"))) { body = new SignedChunksInputStream(body); + length = Long.parseLong( + headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER)); } long putLength = 0; if (datastreamEnabled && !enableEC && length > datastreamMinLength) { @@ -821,6 +824,8 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD" .equals(headers.getHeaderString("x-amz-content-sha256"))) { body = new SignedChunksInputStream(body); + length = Long.parseLong( + headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER)); } copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER); @@ -843,6 +848,20 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, .createMultipartKey(ozoneBucket, key, length, partNumber, uploadID, chunkSize, body); } + + copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER); + if (copyHeader != null) { + String range = + headers.getHeaderString(COPY_SOURCE_HEADER_RANGE); + if (range != null) { + RangeHeader rangeHeader = + RangeHeaderParserUtil.parseRangeHeader(range, 0); + // When copy Range, the size of the target key is the + // length specified by COPY_SOURCE_HEADER_RANGE. + length = rangeHeader.getEndOffset() - + rangeHeader.getStartOffset() + 1; + } + } ozoneOutputStream = getClientProtocol().createMultipartKey( volume.getName(), bucket, key, length, partNumber, uploadID); diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java index 5505688b26c1..71f478435079 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java @@ -577,6 +577,11 @@ public void setThreadLocalS3Auth(S3Auth s3Auth) { } + @Override + public void setIsS3Request(boolean isS3Request) { + + } + @Override public S3Auth getThreadLocalS3Auth() { return null; diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java index 460b073b3928..9ac43800362b 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java @@ -513,7 +513,7 @@ public OzoneMultipartUploadPartListParts listParts(String key, if (partEntry.getKey() > partNumberMarker) { PartInfo partInfo = new PartInfo(partEntry.getKey(), partEntry.getValue().getPartName(), - partEntry.getValue().getContent().length, Time.now()); + Time.now(), partEntry.getValue().getContent().length); partInfoList.add(partInfo); count++; } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java index 7422806db743..7186ceb5578e 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java @@ -40,6 +40,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientStub; +import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts; import org.apache.hadoop.ozone.s3.endpoint.CompleteMultipartUploadRequest.Part; import org.apache.hadoop.ozone.s3.exception.OS3Exception; @@ -385,6 +386,28 @@ private Part uploadPartWithCopy(String key, String uploadID, int partNumber, return part; } + @Test + public void testUploadWithRangeCopyContentLength() + throws IOException, OS3Exception { + // The contentLength specified when creating the Key should be the same as + // the Content-Length, the key Commit will compare the Content-Length with + // the actual length of the data written. + + String uploadID = initiateMultipartUpload(KEY); + ByteArrayInputStream body = new ByteArrayInputStream("".getBytes(UTF_8)); + Map additionalHeaders = new HashMap<>(); + additionalHeaders.put(COPY_SOURCE_HEADER, + OzoneConsts.S3_BUCKET + "/" + EXISTING_KEY); + additionalHeaders.put(COPY_SOURCE_HEADER_RANGE, "bytes=0-3"); + setHeaders(additionalHeaders); + REST.put(OzoneConsts.S3_BUCKET, KEY, 0, 1, uploadID, body); + OzoneMultipartUploadPartListParts parts = + CLIENT.getObjectStore().getS3Bucket(OzoneConsts.S3_BUCKET) + .listParts(KEY, uploadID, 0, 100); + Assert.assertEquals(1, parts.getPartInfoList().size()); + Assert.assertEquals(4, parts.getPartInfoList().get(0).getSize()); + } + private void completeMultipartUpload(String key, CompleteMultipartUploadRequest completeMultipartUploadRequest, String uploadID) throws IOException, OS3Exception { diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java index 55c661084beb..53f867000ff7 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java @@ -49,6 +49,7 @@ import org.mockito.Mockito; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Utils.urlEncode; @@ -140,6 +141,47 @@ public void testPutObjectWithECReplicationConfig() Assert.assertEquals(CONTENT, keyContent); } + @Test + public void testPutObjectContentLength() throws IOException, OS3Exception { + // The contentLength specified when creating the Key should be the same as + // the Content-Length, the key Commit will compare the Content-Length with + // the actual length of the data written. + HttpHeaders headers = Mockito.mock(HttpHeaders.class); + ByteArrayInputStream body = + new ByteArrayInputStream(CONTENT.getBytes(UTF_8)); + objectEndpoint.setHeaders(headers); + long dataSize = CONTENT.length() + 1; + + objectEndpoint.put(bucketName, keyName, dataSize, 0, null, body); + Assert.assertEquals(dataSize + 1, getKeyDataSize(keyName)); + } + + @Test + public void testPutObjectContentLengthForStreaming() + throws IOException, OS3Exception { + HttpHeaders headers = Mockito.mock(HttpHeaders.class); + objectEndpoint.setHeaders(headers); + + String chunkedContent = "0a;chunk-signature=signature\r\n" + + "1234567890\r\n" + + "05;chunk-signature=signature\r\n" + + "abcde\r\n"; + + when(headers.getHeaderString("x-amz-content-sha256")) + .thenReturn("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"); + + when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER)) + .thenReturn("15"); + objectEndpoint.put(bucketName, keyName, chunkedContent.length(), 0, null, + new ByteArrayInputStream(chunkedContent.getBytes(UTF_8))); + Assert.assertEquals(15, getKeyDataSize(keyName)); + } + + private long getKeyDataSize(String key) throws IOException { + return clientStub.getObjectStore().getS3Bucket(bucketName) + .getKey(key).getDataSize(); + } + @Test public void testPutObjectWithSignedChunks() throws IOException, OS3Exception { //GIVEN @@ -153,6 +195,8 @@ public void testPutObjectWithSignedChunks() throws IOException, OS3Exception { when(headers.getHeaderString("x-amz-content-sha256")) .thenReturn("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"); + when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER)) + .thenReturn("15"); //WHEN Response response = objectEndpoint.put(bucketName, keyName, diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java index 1b0e808f2d4d..6693c69d191e 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java @@ -24,7 +24,9 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientStub; +import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts; import org.apache.hadoop.ozone.s3.exception.OS3Exception; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; @@ -33,9 +35,11 @@ import javax.ws.rs.core.Response; import java.io.ByteArrayInputStream; +import java.io.IOException; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -49,11 +53,12 @@ public class TestPartUpload { private static final ObjectEndpoint REST = new ObjectEndpoint(); + private static OzoneClient client; @BeforeClass public static void setUp() throws Exception { - OzoneClient client = new OzoneClientStub(); + client = new OzoneClientStub(); client.getObjectStore().createS3Bucket(OzoneConsts.S3_BUCKET); @@ -89,6 +94,70 @@ public void testPartUpload() throws Exception { } + @Test + public void testPartUploadStreamContentLength() + throws IOException, OS3Exception { + HttpHeaders headers = Mockito.mock(HttpHeaders.class); + REST.setHeaders(headers); + + String chunkedContent = "0a;chunk-signature=signature\r\n" + + "1234567890\r\n" + + "05;chunk-signature=signature\r\n" + + "abcde\r\n"; + when(headers.getHeaderString("x-amz-content-sha256")) + .thenReturn("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"); + when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER)) + .thenReturn("15"); + + Response response = REST.initializeMultipartUpload(OzoneConsts.S3_BUCKET, + OzoneConsts.KEY); + MultipartUploadInitiateResponse multipartUploadInitiateResponse = + (MultipartUploadInitiateResponse) response.getEntity(); + assertNotNull(multipartUploadInitiateResponse.getUploadID()); + String uploadID = multipartUploadInitiateResponse.getUploadID(); + String content = "Multipart Upload"; + long contentLength = chunkedContent.length(); + + ByteArrayInputStream body = + new ByteArrayInputStream(content.getBytes(UTF_8)); + REST.put(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, + contentLength, 1, uploadID, + new ByteArrayInputStream(chunkedContent.getBytes(UTF_8))); + assertContentLength(uploadID, OzoneConsts.KEY, 15); + } + + @Test + public void testPartUploadContentLength() throws IOException, OS3Exception { + // The contentLength specified when creating the Key should be the same as + // the Content-Length, the key Commit will compare the Content-Length with + // the actual length of the data written. + + Response response = REST.initializeMultipartUpload(OzoneConsts.S3_BUCKET, + OzoneConsts.KEY); + MultipartUploadInitiateResponse multipartUploadInitiateResponse = + (MultipartUploadInitiateResponse) response.getEntity(); + assertNotNull(multipartUploadInitiateResponse.getUploadID()); + String uploadID = multipartUploadInitiateResponse.getUploadID(); + String content = "Multipart Upload"; + long contentLength = content.length() + 1; + + ByteArrayInputStream body = + new ByteArrayInputStream(content.getBytes(UTF_8)); + REST.put(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, + contentLength, 1, uploadID, body); + assertContentLength(uploadID, OzoneConsts.KEY, content.length()); + } + + private void assertContentLength(String uploadID, String key, + long contentLength) throws IOException { + OzoneMultipartUploadPartListParts parts = + client.getObjectStore().getS3Bucket(OzoneConsts.S3_BUCKET) + .listParts(key, uploadID, 0, 100); + Assert.assertEquals(1, parts.getPartInfoList().size()); + Assert.assertEquals(contentLength, + parts.getPartInfoList().get(0).getSize()); + } + @Test public void testPartUploadWithOverride() throws Exception { From c15cbbcde11e1a775fe9cff3322c3b36582118e3 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Wed, 1 Nov 2023 01:08:54 +0800 Subject: [PATCH 02/17] fix test --- .../ozone/s3/endpoint/TestObjectPut.java | 4 +- .../ozone/s3/endpoint/TestPartUpload.java | 125 +++++++++--------- 2 files changed, 66 insertions(+), 63 deletions(-) diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java index 53f867000ff7..4c1fd9a7aaf0 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java @@ -150,10 +150,10 @@ public void testPutObjectContentLength() throws IOException, OS3Exception { ByteArrayInputStream body = new ByteArrayInputStream(CONTENT.getBytes(UTF_8)); objectEndpoint.setHeaders(headers); - long dataSize = CONTENT.length() + 1; + long dataSize = CONTENT.length(); objectEndpoint.put(bucketName, keyName, dataSize, 0, null, body); - Assert.assertEquals(dataSize + 1, getKeyDataSize(keyName)); + Assert.assertEquals(dataSize, getKeyDataSize(keyName)); } @Test diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java index 6693c69d191e..4e625ba45a1f 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java @@ -36,6 +36,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.UUID; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.nio.charset.StandardCharsets.UTF_8; @@ -94,11 +95,63 @@ public void testPartUpload() throws Exception { } + @Test + public void testPartUploadWithOverride() throws Exception { + + Response response = REST.initializeMultipartUpload(OzoneConsts.S3_BUCKET, + OzoneConsts.KEY); + MultipartUploadInitiateResponse multipartUploadInitiateResponse = + (MultipartUploadInitiateResponse) response.getEntity(); + assertNotNull(multipartUploadInitiateResponse.getUploadID()); + String uploadID = multipartUploadInitiateResponse.getUploadID(); + + assertEquals(200, response.getStatus()); + + String content = "Multipart Upload"; + ByteArrayInputStream body = + new ByteArrayInputStream(content.getBytes(UTF_8)); + response = REST.put(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, + content.length(), 1, uploadID, body); + + assertNotNull(response.getHeaderString("ETag")); + + String eTag = response.getHeaderString("ETag"); + + // Upload part again with same part Number, the ETag should be changed. + content = "Multipart Upload Changed"; + response = REST.put(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, + content.length(), 1, uploadID, body); + assertNotNull(response.getHeaderString("ETag")); + assertNotEquals(eTag, response.getHeaderString("ETag")); + + } + + + @Test + public void testPartUploadWithIncorrectUploadID() throws Exception { + try { + String content = "Multipart Upload With Incorrect uploadID"; + ByteArrayInputStream body = + new ByteArrayInputStream(content.getBytes(UTF_8)); + REST.put(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, content.length(), 1, + "random", body); + fail("testPartUploadWithIncorrectUploadID failed"); + } catch (OS3Exception ex) { + assertEquals("NoSuchUpload", ex.getCode()); + assertEquals(HTTP_NOT_FOUND, ex.getHttpCode()); + } + } + @Test public void testPartUploadStreamContentLength() throws IOException, OS3Exception { HttpHeaders headers = Mockito.mock(HttpHeaders.class); - REST.setHeaders(headers); + ObjectEndpoint objectEndpoint = new ObjectEndpoint(); + objectEndpoint.setHeaders(headers); + objectEndpoint.setClient(client); + objectEndpoint.setOzoneConfiguration(new OzoneConfiguration()); + objectEndpoint.setHeaders(headers); + String keyName = UUID.randomUUID().toString(); String chunkedContent = "0a;chunk-signature=signature\r\n" + "1234567890\r\n" @@ -109,21 +162,17 @@ public void testPartUploadStreamContentLength() when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER)) .thenReturn("15"); - Response response = REST.initializeMultipartUpload(OzoneConsts.S3_BUCKET, - OzoneConsts.KEY); + Response response = objectEndpoint.initializeMultipartUpload( + OzoneConsts.S3_BUCKET, keyName); MultipartUploadInitiateResponse multipartUploadInitiateResponse = (MultipartUploadInitiateResponse) response.getEntity(); assertNotNull(multipartUploadInitiateResponse.getUploadID()); String uploadID = multipartUploadInitiateResponse.getUploadID(); - String content = "Multipart Upload"; long contentLength = chunkedContent.length(); - ByteArrayInputStream body = - new ByteArrayInputStream(content.getBytes(UTF_8)); - REST.put(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, - contentLength, 1, uploadID, - new ByteArrayInputStream(chunkedContent.getBytes(UTF_8))); - assertContentLength(uploadID, OzoneConsts.KEY, 15); + objectEndpoint.put(OzoneConsts.S3_BUCKET, keyName, contentLength, 1, + uploadID, new ByteArrayInputStream(chunkedContent.getBytes())); + assertContentLength(uploadID, keyName, 15); } @Test @@ -132,20 +181,21 @@ public void testPartUploadContentLength() throws IOException, OS3Exception { // the Content-Length, the key Commit will compare the Content-Length with // the actual length of the data written. + String keyName = UUID.randomUUID().toString(); Response response = REST.initializeMultipartUpload(OzoneConsts.S3_BUCKET, - OzoneConsts.KEY); + keyName); MultipartUploadInitiateResponse multipartUploadInitiateResponse = (MultipartUploadInitiateResponse) response.getEntity(); assertNotNull(multipartUploadInitiateResponse.getUploadID()); String uploadID = multipartUploadInitiateResponse.getUploadID(); String content = "Multipart Upload"; - long contentLength = content.length() + 1; + long contentLength = content.length(); ByteArrayInputStream body = new ByteArrayInputStream(content.getBytes(UTF_8)); - REST.put(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, + REST.put(OzoneConsts.S3_BUCKET, keyName, contentLength, 1, uploadID, body); - assertContentLength(uploadID, OzoneConsts.KEY, content.length()); + assertContentLength(uploadID, keyName, content.length()); } private void assertContentLength(String uploadID, String key, @@ -157,51 +207,4 @@ private void assertContentLength(String uploadID, String key, Assert.assertEquals(contentLength, parts.getPartInfoList().get(0).getSize()); } - - @Test - public void testPartUploadWithOverride() throws Exception { - - Response response = REST.initializeMultipartUpload(OzoneConsts.S3_BUCKET, - OzoneConsts.KEY); - MultipartUploadInitiateResponse multipartUploadInitiateResponse = - (MultipartUploadInitiateResponse) response.getEntity(); - assertNotNull(multipartUploadInitiateResponse.getUploadID()); - String uploadID = multipartUploadInitiateResponse.getUploadID(); - - assertEquals(200, response.getStatus()); - - String content = "Multipart Upload"; - ByteArrayInputStream body = - new ByteArrayInputStream(content.getBytes(UTF_8)); - response = REST.put(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, - content.length(), 1, uploadID, body); - - assertNotNull(response.getHeaderString("ETag")); - - String eTag = response.getHeaderString("ETag"); - - // Upload part again with same part Number, the ETag should be changed. - content = "Multipart Upload Changed"; - response = REST.put(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, - content.length(), 1, uploadID, body); - assertNotNull(response.getHeaderString("ETag")); - assertNotEquals(eTag, response.getHeaderString("ETag")); - - } - - - @Test - public void testPartUploadWithIncorrectUploadID() throws Exception { - try { - String content = "Multipart Upload With Incorrect uploadID"; - ByteArrayInputStream body = - new ByteArrayInputStream(content.getBytes(UTF_8)); - REST.put(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, content.length(), 1, - "random", body); - fail("testPartUploadWithIncorrectUploadID failed"); - } catch (OS3Exception ex) { - assertEquals("NoSuchUpload", ex.getCode()); - assertEquals(HTTP_NOT_FOUND, ex.getHttpCode()); - } - } } From 32c3a09f6b92506cdb89ab0e6bf0a7c9efc08707 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Wed, 1 Nov 2023 10:06:08 +0800 Subject: [PATCH 03/17] findbugs --- .../org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java index 4e625ba45a1f..fd2b8c000019 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java @@ -171,7 +171,7 @@ public void testPartUploadStreamContentLength() long contentLength = chunkedContent.length(); objectEndpoint.put(OzoneConsts.S3_BUCKET, keyName, contentLength, 1, - uploadID, new ByteArrayInputStream(chunkedContent.getBytes())); + uploadID, new ByteArrayInputStream(chunkedContent.getBytes(UTF_8))); assertContentLength(uploadID, keyName, 15); } From 931cd223398c19727de0f0eeaf32e8dd1e99334b Mon Sep 17 00:00:00 2001 From: xichen01 Date: Wed, 1 Nov 2023 10:24:23 +0800 Subject: [PATCH 04/17] checkstyle --- .../java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 954406471359..19167498eeff 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -206,6 +206,7 @@ public void init() { * See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html for * more details. */ + @SuppressWarnings("checkstyle:MethodLength") @PUT public Response put( @PathParam("bucket") String bucketName, From 5f10ae44ff0bcf3da7963bf2bbe3d60396dfe7a6 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Wed, 1 Nov 2023 15:37:06 +0800 Subject: [PATCH 05/17] Fix robot test --- .../ozone/s3/endpoint/ObjectEndpoint.java | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 19167498eeff..363849258bbe 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -889,8 +889,13 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, uploadID, chunkSize, (DigestInputStream) body); } - copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER); if (copyHeader != null) { + Pair result = parseSourceHeader(copyHeader); + String sourceBucket = result.getLeft(); + String sourceKey = result.getRight(); + + OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails( + volume.getName(), sourceBucket, sourceKey); String range = headers.getHeaderString(COPY_SOURCE_HEADER_RANGE); if (range != null) { @@ -900,19 +905,11 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, // length specified by COPY_SOURCE_HEADER_RANGE. length = rangeHeader.getEndOffset() - rangeHeader.getStartOffset() + 1; + } else { + length = sourceKeyDetails.getDataSize(); } - } - ozoneOutputStream = getClientProtocol().createMultipartKey( - volume.getName(), bucket, key, length, partNumber, uploadID); - - if (copyHeader != null) { - Pair result = parseSourceHeader(copyHeader); - - String sourceBucket = result.getLeft(); - String sourceKey = result.getRight(); - - OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails( - volume.getName(), sourceBucket, sourceKey); + ozoneOutputStream = getClientProtocol().createMultipartKey( + volume.getName(), bucket, key, length, partNumber, uploadID); Long sourceKeyModificationTime = sourceKeyDetails .getModificationTime().toEpochMilli(); String copySourceIfModifiedSince = @@ -925,9 +922,6 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, } try (OzoneInputStream sourceObject = sourceKeyDetails.getContent()) { - - String range = - headers.getHeaderString(COPY_SOURCE_HEADER_RANGE); long copyLength; if (range != null) { RangeHeader rangeHeader = @@ -951,6 +945,8 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, } } else { getMetrics().updatePutKeyMetadataStats(startNanos); + ozoneOutputStream = getClientProtocol().createMultipartKey( + volume.getName(), bucket, key, length, partNumber, uploadID); long putLength = IOUtils.copyLarge(body, ozoneOutputStream); ((KeyMetadataAware)ozoneOutputStream.getOutputStream()) .getMetadata().put("ETag", DatatypeConverter.printHexBinary( From abcf430fb7f108b46f093ed0bc94210c99ea0d57 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Wed, 1 Nov 2023 16:18:54 +0800 Subject: [PATCH 06/17] rename requiresAtomicWrite to atomicKeyCreation --- .../ozone/client/io/KeyDataStreamOutput.java | 16 ++++++++-------- .../hadoop/ozone/client/io/KeyOutputStream.java | 16 ++++++++-------- .../hadoop/ozone/client/rpc/RpcClient.java | 12 ++++++------ 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index 3acbec8cf292..2368cd78e9aa 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -87,7 +87,7 @@ enum StreamAction { * A mismatch will prevent the commit from succeeding. * This is essential for operations like S3 put to ensure atomicity. */ - private boolean requiresAtomicWrite; + private boolean atomicKeyCreation; @VisibleForTesting public List getStreamEntries() { @@ -118,7 +118,7 @@ public KeyDataStreamOutput( String requestId, ReplicationConfig replicationConfig, String uploadID, int partNumber, boolean isMultipart, boolean unsafeByteBufferConversion, - boolean requiresAtomicWrite + boolean atomicKeyCreation ) { super(HddsClientUtils.getRetryPolicyByException( config.getMaxRetryCount(), config.getRetryInterval())); @@ -139,7 +139,7 @@ public KeyDataStreamOutput( // encrypted bucket. this.writeOffset = 0; this.clientID = handler.getId(); - this.requiresAtomicWrite = requiresAtomicWrite; + this.atomicKeyCreation = atomicKeyCreation; } /** @@ -397,7 +397,7 @@ public void close() throws IOException { if (!isException()) { Preconditions.checkArgument(writeOffset == offset); } - if (requiresAtomicWrite) { + if (atomicKeyCreation) { long expectedSize = blockDataStreamOutputEntryPool.getDataSize(); Preconditions.checkArgument(expectedSize == offset, String.format("Expected: %d and actual %d write sizes do not match", @@ -438,7 +438,7 @@ public static class Builder { private boolean unsafeByteBufferConversion; private OzoneClientConfig clientConfig; private ReplicationConfig replicationConfig; - private boolean requiresAtomicWrite = false; + private boolean atomicKeyCreation = false; public Builder setMultipartUploadID(String uploadID) { this.multipartUploadID = uploadID; @@ -491,8 +491,8 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) { return this; } - public Builder setRequiresAtomicWrite(boolean atomicWrite) { - this.requiresAtomicWrite = atomicWrite; + public Builder setAtomicKeyCreation(boolean atomicKey) { + this.atomicKeyCreation = atomicKey; return this; } @@ -509,7 +509,7 @@ public KeyDataStreamOutput build() { multipartNumber, isMultipartKey, unsafeByteBufferConversion, - requiresAtomicWrite); + atomicKeyCreation); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 6194d114f994..2936c146a307 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -101,7 +101,7 @@ enum StreamAction { * A mismatch will prevent the commit from succeeding. * This is essential for operations like S3 put to ensure atomicity. */ - private boolean requiresAtomicWrite; + private boolean atomicKeyCreation; public KeyOutputStream(ReplicationConfig replicationConfig, ContainerClientMetrics clientMetrics) { @@ -151,7 +151,7 @@ public KeyOutputStream( String uploadID, int partNumber, boolean isMultipart, boolean unsafeByteBufferConversion, ContainerClientMetrics clientMetrics, - boolean requiresAtomicWrite + boolean atomicKeyCreation ) { this.config = config; this.replication = replicationConfig; @@ -172,7 +172,7 @@ public KeyOutputStream( this.isException = false; this.writeOffset = 0; this.clientID = handler.getId(); - this.requiresAtomicWrite = requiresAtomicWrite; + this.atomicKeyCreation = atomicKeyCreation; } /** @@ -565,7 +565,7 @@ public synchronized void close() throws IOException { if (!isException) { Preconditions.checkArgument(writeOffset == offset); } - if (requiresAtomicWrite) { + if (atomicKeyCreation) { long expectedSize = blockOutputStreamEntryPool.getDataSize(); Preconditions.checkArgument(expectedSize == offset, String.format("Expected: %d and actual %d write sizes do not match", @@ -607,7 +607,7 @@ public static class Builder { private OzoneClientConfig clientConfig; private ReplicationConfig replicationConfig; private ContainerClientMetrics clientMetrics; - private boolean requiresAtomicWrite = false; + private boolean atomicKeyCreation = false; public String getMultipartUploadID() { return multipartUploadID; @@ -694,8 +694,8 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) { return this; } - public Builder setRequiresAtomicWrite(boolean atomicWrite) { - this.requiresAtomicWrite = atomicWrite; + public Builder setAtomicKeyCreation(boolean atomicKey) { + this.atomicKeyCreation = atomicKey; return this; } @@ -721,7 +721,7 @@ public KeyOutputStream build() { isMultipartKey, unsafeByteBufferConversion, clientMetrics, - requiresAtomicWrite); + atomicKeyCreation); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 0112d4c6fa2c..5498a46012de 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -1789,7 +1789,7 @@ public OzoneOutputStream createMultipartKey( .setMultipartNumber(partNumber) .setMultipartUploadID(uploadID) .setIsMultipartKey(true) - .setRequiresAtomicWrite(isS3GRequest.get()) + .setAtomicKeyCreation(isS3GRequest.get()) .build(); return createOutputStream(openKey, keyOutputStream); } @@ -1806,7 +1806,7 @@ public OzoneDataStreamOutput createMultipartStreamKey( final OpenKeySession openKey = newMultipartOpenKey( volumeName, bucketName, keyName, size, partNumber, uploadID); // Amazon S3 never adds partial objects, So for S3 requests we need to - // set RequiresAtomicWrite to true + // set atomicKeyCreation to true // refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html KeyDataStreamOutput keyOutputStream = new KeyDataStreamOutput.Builder() @@ -1819,7 +1819,7 @@ public OzoneDataStreamOutput createMultipartStreamKey( .setIsMultipartKey(true) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .setConfig(conf.getObject(OzoneClientConfig.class)) - .setRequiresAtomicWrite(isS3GRequest.get()) + .setAtomicKeyCreation(isS3GRequest.get()) .build(); keyOutputStream .addPreallocateBlocks( @@ -2223,7 +2223,7 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey) final ReplicationConfig replicationConfig = openKey.getKeyInfo().getReplicationConfig(); // Amazon S3 never adds partial objects, So for S3 requests we need to - // set RequiresAtomicWrite to true + // set atomicKeyCreation to true // refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html KeyDataStreamOutput keyOutputStream = new KeyDataStreamOutput.Builder() @@ -2233,7 +2233,7 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey) .setReplicationConfig(replicationConfig) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .setConfig(conf.getObject(OzoneClientConfig.class)) - .setRequiresAtomicWrite(isS3GRequest.get()) + .setAtomicKeyCreation(isS3GRequest.get()) .build(); keyOutputStream .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(), @@ -2315,7 +2315,7 @@ private KeyOutputStream.Builder createKeyOutputStream( .setOmClient(ozoneManagerClient) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .setConfig(conf.getObject(OzoneClientConfig.class)) - .setRequiresAtomicWrite(isS3GRequest.get()) + .setAtomicKeyCreation(isS3GRequest.get()) .setClientMetrics(clientMetrics); } From cb7712fc3783f769454ad15ecb9db618d3b4c210 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Wed, 1 Nov 2023 17:04:24 +0800 Subject: [PATCH 07/17] Catch the IllegalStateException when put#close() --- .../apache/hadoop/ozone/client/io/KeyOutputStream.java | 2 +- .../apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 2936c146a307..d80a7cc797a1 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -567,7 +567,7 @@ public synchronized void close() throws IOException { } if (atomicKeyCreation) { long expectedSize = blockOutputStreamEntryPool.getDataSize(); - Preconditions.checkArgument(expectedSize == offset, + Preconditions.checkState(expectedSize == offset, String.format("Expected: %d and actual %d write sizes do not match", expectedSize, offset)); } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 363849258bbe..2d8183475c53 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -357,7 +357,14 @@ public Response put( throw ex; } finally { if (output != null) { - output.close(); + try { + output.close(); + } catch (IllegalStateException ex) { + LOG.error(String.format( + "Data read has a different length than the expected," + + " bucket %s, key %s", bucketName, keyPath), ex); + auditWriteFailure(s3GAction, ex); + } } if (auditSuccess) { AUDIT.logWriteSuccess( From ee231611b1f363961fa0ba8977e654f8583412b1 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Wed, 1 Nov 2023 17:22:35 +0800 Subject: [PATCH 08/17] Remove duplicate code --- .../java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java index fd2b8c000019..6ba1b557ec3e 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java @@ -150,7 +150,6 @@ public void testPartUploadStreamContentLength() objectEndpoint.setHeaders(headers); objectEndpoint.setClient(client); objectEndpoint.setOzoneConfiguration(new OzoneConfiguration()); - objectEndpoint.setHeaders(headers); String keyName = UUID.randomUUID().toString(); String chunkedContent = "0a;chunk-signature=signature\r\n" From 9605aad550911b8fc32b9d65c179e4db0e7f0d6a Mon Sep 17 00:00:00 2001 From: xichen01 Date: Fri, 3 Nov 2023 08:54:05 +0800 Subject: [PATCH 09/17] Fix test --- .../java/org/apache/hadoop/ozone/client/rpc/RpcClient.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 5498a46012de..1cb81bd47f1a 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -1348,6 +1348,9 @@ public OzoneOutputStream createKey( .setLatestVersionLocation(getLatestVersionLocation); OpenKeySession openKey = ozoneManagerClient.openKey(builder.build()); + if (isS3GRequest.get() && size == 0) { + openKey.getKeyInfo().setDataSize(size); + } return createOutputStream(openKey); } From c4d3eb035ca24782133e72873b080b1571c7d762 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Mon, 6 Nov 2023 12:55:45 +0800 Subject: [PATCH 10/17] Remove the explicit close for OzoneOutputStream --- .../ozone/client/io/KeyDataStreamOutput.java | 8 +- .../ozone/s3/endpoint/ObjectEndpoint.java | 171 ++++++++---------- .../s3/endpoint/ObjectEndpointStreaming.java | 27 +-- 3 files changed, 94 insertions(+), 112 deletions(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index 2368cd78e9aa..608bc0cfa66d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -399,9 +399,11 @@ public void close() throws IOException { } if (atomicKeyCreation) { long expectedSize = blockDataStreamOutputEntryPool.getDataSize(); - Preconditions.checkArgument(expectedSize == offset, - String.format("Expected: %d and actual %d write sizes do not match", - expectedSize, offset)); + if (expectedSize != offset) { + LOG.info(String.format("Expected %d and actual %d write" + + " sizes do not match, skip key COMMIT for atomic key", + expectedSize, offset)); + } } blockDataStreamOutputEntryPool.commitKey(offset); } finally { diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 2d8183475c53..655b6c3a6bfe 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -64,7 +64,6 @@ import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.om.helpers.BucketLayout; -import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.s3.HeaderPreprocessor; @@ -219,9 +218,6 @@ public Response put( S3GAction s3GAction = S3GAction.CREATE_KEY; boolean auditSuccess = true; - - OzoneOutputStream output = null; - String copyHeader = null, storageType = null; try { OzoneVolume volume = getVolume(); @@ -307,14 +303,16 @@ public Response put( eTag = keyWriteResult.getKey(); putLength = keyWriteResult.getValue(); } else { - output = getClientProtocol().createKey(volume.getName(), bucketName, - keyPath, length, replicationConfig, customMetadata); - getMetrics().updatePutKeyMetadataStats(startNanos); - putLength = IOUtils.copyLarge(body, output); - eTag = DatatypeConverter.printHexBinary( - ((DigestInputStream) body).getMessageDigest().digest()) - .toLowerCase(); - output.getMetadata().put(ETAG, eTag); + try (OzoneOutputStream output = getClientProtocol().createKey( + volume.getName(), bucketName, keyPath, length, replicationConfig, + customMetadata)) { + getMetrics().updatePutKeyMetadataStats(startNanos); + putLength = IOUtils.copyLarge(body, output); + eTag = DatatypeConverter.printHexBinary( + ((DigestInputStream) body).getMessageDigest().digest()) + .toLowerCase(); + output.getMetadata().put(ETAG, eTag); + } } getMetrics().incPutKeySuccessLength(putLength); @@ -356,16 +354,6 @@ public Response put( } throw ex; } finally { - if (output != null) { - try { - output.close(); - } catch (IllegalStateException ex) { - LOG.error(String.format( - "Data read has a different length than the expected," + - " bucket %s, key %s", bucketName, keyPath), ex); - auditWriteFailure(s3GAction, ex); - } - } if (auditSuccess) { AUDIT.logWriteSuccess( buildAuditMessageForSuccess(s3GAction, getAuditParameters())); @@ -856,6 +844,7 @@ public Response completeMultipartUpload(@PathParam("bucket") String bucket, } } + @SuppressWarnings("checkstyle:MethodLength") private Response createMultipartKey(OzoneVolume volume, String bucket, String key, long length, int partNumber, String uploadID, InputStream body) @@ -863,8 +852,6 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, long startNanos = Time.monotonicNowNanos(); String copyHeader = null; try { - OzoneOutputStream ozoneOutputStream = null; - if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD" .equals(headers.getHeaderString("x-amz-content-sha256"))) { body = new DigestInputStream(new SignedChunksInputStream(body), @@ -888,90 +875,90 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, enableEC = true; } - try { - if (datastreamEnabled && !enableEC && copyHeader == null) { - getMetrics().updatePutKeyMetadataStats(startNanos); - return ObjectEndpointStreaming - .createMultipartKey(ozoneBucket, key, length, partNumber, - uploadID, chunkSize, (DigestInputStream) body); + if (datastreamEnabled && !enableEC && copyHeader == null) { + getMetrics().updatePutKeyMetadataStats(startNanos); + return ObjectEndpointStreaming + .createMultipartKey(ozoneBucket, key, length, partNumber, + uploadID, chunkSize, (DigestInputStream) body); + } + String eTag; + if (copyHeader != null) { + Pair result = parseSourceHeader(copyHeader); + String sourceBucket = result.getLeft(); + String sourceKey = result.getRight(); + + OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails( + volume.getName(), sourceBucket, sourceKey); + String range = + headers.getHeaderString(COPY_SOURCE_HEADER_RANGE); + if (range != null) { + RangeHeader rangeHeader = + RangeHeaderParserUtil.parseRangeHeader(range, 0); + // When copy Range, the size of the target key is the + // length specified by COPY_SOURCE_HEADER_RANGE. + length = rangeHeader.getEndOffset() - + rangeHeader.getStartOffset() + 1; + } else { + length = sourceKeyDetails.getDataSize(); + } + Long sourceKeyModificationTime = sourceKeyDetails + .getModificationTime().toEpochMilli(); + String copySourceIfModifiedSince = + headers.getHeaderString(COPY_SOURCE_IF_MODIFIED_SINCE); + String copySourceIfUnmodifiedSince = + headers.getHeaderString(COPY_SOURCE_IF_UNMODIFIED_SINCE); + if (!checkCopySourceModificationTime(sourceKeyModificationTime, + copySourceIfModifiedSince, copySourceIfUnmodifiedSince)) { + throw newError(PRECOND_FAILED, sourceBucket + "/" + sourceKey); } - if (copyHeader != null) { - Pair result = parseSourceHeader(copyHeader); - String sourceBucket = result.getLeft(); - String sourceKey = result.getRight(); - - OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails( - volume.getName(), sourceBucket, sourceKey); - String range = - headers.getHeaderString(COPY_SOURCE_HEADER_RANGE); + try (OzoneInputStream sourceObject = sourceKeyDetails.getContent()) { + long copyLength; if (range != null) { RangeHeader rangeHeader = RangeHeaderParserUtil.parseRangeHeader(range, 0); - // When copy Range, the size of the target key is the - // length specified by COPY_SOURCE_HEADER_RANGE. - length = rangeHeader.getEndOffset() - - rangeHeader.getStartOffset() + 1; - } else { - length = sourceKeyDetails.getDataSize(); - } - ozoneOutputStream = getClientProtocol().createMultipartKey( - volume.getName(), bucket, key, length, partNumber, uploadID); - Long sourceKeyModificationTime = sourceKeyDetails - .getModificationTime().toEpochMilli(); - String copySourceIfModifiedSince = - headers.getHeaderString(COPY_SOURCE_IF_MODIFIED_SINCE); - String copySourceIfUnmodifiedSince = - headers.getHeaderString(COPY_SOURCE_IF_UNMODIFIED_SINCE); - if (!checkCopySourceModificationTime(sourceKeyModificationTime, - copySourceIfModifiedSince, copySourceIfUnmodifiedSince)) { - throw newError(PRECOND_FAILED, sourceBucket + "/" + sourceKey); - } - - try (OzoneInputStream sourceObject = sourceKeyDetails.getContent()) { - long copyLength; - if (range != null) { - RangeHeader rangeHeader = - RangeHeaderParserUtil.parseRangeHeader(range, 0); - final long skipped = - sourceObject.skip(rangeHeader.getStartOffset()); - if (skipped != rangeHeader.getStartOffset()) { - throw new EOFException( - "Bytes to skip: " - + rangeHeader.getStartOffset() + " actual: " + skipped); - } + final long skipped = + sourceObject.skip(rangeHeader.getStartOffset()); + if (skipped != rangeHeader.getStartOffset()) { + throw new EOFException( + "Bytes to skip: " + + rangeHeader.getStartOffset() + " actual: " + skipped); + } + try (OzoneOutputStream ozoneOutputStream = getClientProtocol() + .createMultipartKey(volume.getName(), bucket, key, length, + partNumber, uploadID)) { getMetrics().updateCopyKeyMetadataStats(startNanos); - copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream, 0, - rangeHeader.getEndOffset() - rangeHeader.getStartOffset() - + 1); - } else { + copyLength = IOUtils.copyLarge( + sourceObject, ozoneOutputStream, 0, length); + eTag = ozoneOutputStream.getCommitUploadPartInfo().getPartName(); + } + } else { + try (OzoneOutputStream ozoneOutputStream = getClientProtocol() + .createMultipartKey(volume.getName(), bucket, key, length, + partNumber, uploadID)) { getMetrics().updateCopyKeyMetadataStats(startNanos); copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream); + eTag = ozoneOutputStream.getCommitUploadPartInfo().getPartName(); } - getMetrics().incCopyObjectSuccessLength(copyLength); } - } else { + getMetrics().incCopyObjectSuccessLength(copyLength); + } + } else { + long putLength; + try (OzoneOutputStream ozoneOutputStream = getClientProtocol() + .createMultipartKey(volume.getName(), bucket, key, length, + partNumber, uploadID)) { getMetrics().updatePutKeyMetadataStats(startNanos); - ozoneOutputStream = getClientProtocol().createMultipartKey( - volume.getName(), bucket, key, length, partNumber, uploadID); - long putLength = IOUtils.copyLarge(body, ozoneOutputStream); + putLength = IOUtils.copyLarge(body, ozoneOutputStream); ((KeyMetadataAware)ozoneOutputStream.getOutputStream()) - .getMetadata().put("ETag", DatatypeConverter.printHexBinary( - ((DigestInputStream) body).getMessageDigest().digest()) + .getMetadata().put(ETAG, DatatypeConverter.printHexBinary( + ((DigestInputStream) body).getMessageDigest().digest()) .toLowerCase()); - getMetrics().incPutKeySuccessLength(putLength); - } - } finally { - if (ozoneOutputStream != null) { - ozoneOutputStream.close(); + eTag = ozoneOutputStream.getCommitUploadPartInfo().getPartName(); } + getMetrics().incPutKeySuccessLength(putLength); } - assert ozoneOutputStream != null; - OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo = - ozoneOutputStream.getCommitUploadPartInfo(); - String eTag = omMultipartCommitUploadPartInfo.getPartName(); - if (copyHeader != null) { getMetrics().updateCopyObjectSuccessStats(startNanos); return Response.ok(new CopyPartResult(eTag)).build(); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java index ef87ad450d1c..f7356e241fd9 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java @@ -25,7 +25,6 @@ import org.apache.hadoop.ozone.client.io.KeyMetadataAware; import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.om.exceptions.OMException; -import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.s3.exception.OS3Exception; import org.apache.hadoop.ozone.s3.exception.S3ErrorTable; import org.apache.hadoop.ozone.s3.metrics.S3GatewayMetrics; @@ -145,18 +144,19 @@ public static Response createMultipartKey(OzoneBucket ozoneBucket, String key, String uploadID, int chunkSize, DigestInputStream body) throws IOException, OS3Exception { - OzoneDataStreamOutput streamOutput = null; String eTag; S3GatewayMetrics metrics = S3GatewayMetrics.create(); try { - streamOutput = ozoneBucket - .createMultipartStreamKey(key, length, partNumber, uploadID); - long putLength = - writeToStreamOutput(streamOutput, body, chunkSize, length); - eTag = DatatypeConverter.printHexBinary(body.getMessageDigest().digest()) - .toLowerCase(); - ((KeyMetadataAware)streamOutput).getMetadata().put("ETag", eTag); - metrics.incPutKeySuccessLength(putLength); + try (OzoneDataStreamOutput streamOutput = ozoneBucket + .createMultipartStreamKey(key, length, partNumber, uploadID)) { + long putLength = + writeToStreamOutput(streamOutput, body, chunkSize, length); + metrics.incPutKeySuccessLength(putLength); + ((KeyMetadataAware)streamOutput).getMetadata().put("ETag", + DatatypeConverter.printHexBinary( + body.getMessageDigest().digest()).toLowerCase()); + eTag = streamOutput.getCommitUploadPartInfo().getPartName(); + } } catch (OMException ex) { if (ex.getResult() == OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) { @@ -167,13 +167,6 @@ public static Response createMultipartKey(OzoneBucket ozoneBucket, String key, ozoneBucket.getName() + "/" + key); } throw ex; - } finally { - if (streamOutput != null) { - streamOutput.close(); - OmMultipartCommitUploadPartInfo commitUploadPartInfo = - streamOutput.getCommitUploadPartInfo(); - eTag = commitUploadPartInfo.getPartName(); - } } return Response.ok().header("ETag", eTag).build(); } From 806bfc90cd048fe4a4bcb9f6353217df2e32da97 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Mon, 6 Nov 2023 14:24:00 +0800 Subject: [PATCH 11/17] Fix some test --- .../ozone/client/io/KeyDataStreamOutput.java | 8 +++---- .../client/io/OzoneDataStreamOutput.java | 21 ++++++++++------- .../ozone/client/io/OzoneOutputStream.java | 23 +++++++++++++------ .../ozone/s3/endpoint/ObjectEndpoint.java | 17 ++++++++++---- .../s3/endpoint/ObjectEndpointStreaming.java | 17 ++++++++++---- 5 files changed, 58 insertions(+), 28 deletions(-) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index 608bc0cfa66d..2368cd78e9aa 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -399,11 +399,9 @@ public void close() throws IOException { } if (atomicKeyCreation) { long expectedSize = blockDataStreamOutputEntryPool.getDataSize(); - if (expectedSize != offset) { - LOG.info(String.format("Expected %d and actual %d write" + - " sizes do not match, skip key COMMIT for atomic key", - expectedSize, offset)); - } + Preconditions.checkArgument(expectedSize == offset, + String.format("Expected: %d and actual %d write sizes do not match", + expectedSize, offset)); } blockDataStreamOutputEntryPool.commitKey(offset); } finally { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java index d8cb06ecccf0..c0af1c530103 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java @@ -59,30 +59,35 @@ public synchronized void close() throws IOException { } public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { + KeyDataStreamOutput keyDataStreamOutput = getKeyDataStreamOutput(); + if (keyDataStreamOutput != null) { + return keyDataStreamOutput.getCommitUploadPartInfo(); + } + // Otherwise return null. + return null; + } + + public KeyDataStreamOutput getKeyDataStreamOutput() { if (byteBufferStreamOutput instanceof OzoneOutputStream) { OutputStream outputStream = ((OzoneOutputStream) byteBufferStreamOutput).getOutputStream(); if (outputStream instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) - outputStream).getCommitUploadPartInfo(); + return ((KeyDataStreamOutput) outputStream); } else if (outputStream instanceof CryptoOutputStream) { OutputStream wrappedStream = ((CryptoOutputStream) outputStream).getWrappedStream(); if (wrappedStream instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) wrappedStream) - .getCommitUploadPartInfo(); + return ((KeyDataStreamOutput) wrappedStream); } } else if (outputStream instanceof CipherOutputStreamOzone) { OutputStream wrappedStream = ((CipherOutputStreamOzone) outputStream).getWrappedStream(); if (wrappedStream instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) wrappedStream) - .getCommitUploadPartInfo(); + return ((KeyDataStreamOutput) wrappedStream); } } } else if (byteBufferStreamOutput instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) - byteBufferStreamOutput).getCommitUploadPartInfo(); + return ((KeyDataStreamOutput) byteBufferStreamOutput); } // Otherwise return null. return null; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java index 093b31b7a52e..bd056185e754 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java @@ -123,29 +123,38 @@ public void hsync() throws IOException { } public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { + KeyOutputStream keyOutputStream = getKeyOutputStream(); + if (keyOutputStream != null) { + return keyOutputStream.getCommitUploadPartInfo(); + } + // Otherwise return null. + return null; + } + + public OutputStream getOutputStream() { + return outputStream; + } + + public KeyOutputStream getKeyOutputStream() { if (outputStream instanceof KeyOutputStream) { - return ((KeyOutputStream) outputStream).getCommitUploadPartInfo(); + return ((KeyOutputStream) outputStream); } else if (outputStream instanceof CryptoOutputStream) { OutputStream wrappedStream = ((CryptoOutputStream) outputStream).getWrappedStream(); if (wrappedStream instanceof KeyOutputStream) { - return ((KeyOutputStream) wrappedStream).getCommitUploadPartInfo(); + return ((KeyOutputStream) wrappedStream); } } else if (outputStream instanceof CipherOutputStreamOzone) { OutputStream wrappedStream = ((CipherOutputStreamOzone) outputStream).getWrappedStream(); if (wrappedStream instanceof KeyOutputStream) { - return ((KeyOutputStream)wrappedStream).getCommitUploadPartInfo(); + return ((KeyOutputStream)wrappedStream); } } // Otherwise return null. return null; } - public OutputStream getOutputStream() { - return outputStream; - } - @Override public Map getMetadata() { if (outputStream instanceof CryptoOutputStream) { diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 655b6c3a6bfe..d4ab2ef9b76e 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -59,11 +59,13 @@ import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts; import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.io.KeyMetadataAware; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.s3.HeaderPreprocessor; @@ -852,6 +854,7 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, long startNanos = Time.monotonicNowNanos(); String copyHeader = null; try { + if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD" .equals(headers.getHeaderString("x-amz-content-sha256"))) { body = new DigestInputStream(new SignedChunksInputStream(body), @@ -881,7 +884,7 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, .createMultipartKey(ozoneBucket, key, length, partNumber, uploadID, chunkSize, (DigestInputStream) body); } - String eTag; + KeyOutputStream keyOutputStream = null; if (copyHeader != null) { Pair result = parseSourceHeader(copyHeader); String sourceBucket = result.getLeft(); @@ -930,7 +933,7 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, getMetrics().updateCopyKeyMetadataStats(startNanos); copyLength = IOUtils.copyLarge( sourceObject, ozoneOutputStream, 0, length); - eTag = ozoneOutputStream.getCommitUploadPartInfo().getPartName(); + keyOutputStream = ozoneOutputStream.getKeyOutputStream(); } } else { try (OzoneOutputStream ozoneOutputStream = getClientProtocol() @@ -938,7 +941,7 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, partNumber, uploadID)) { getMetrics().updateCopyKeyMetadataStats(startNanos); copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream); - eTag = ozoneOutputStream.getCommitUploadPartInfo().getPartName(); + keyOutputStream = ozoneOutputStream.getKeyOutputStream(); } } getMetrics().incCopyObjectSuccessLength(copyLength); @@ -954,11 +957,17 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, .getMetadata().put(ETAG, DatatypeConverter.printHexBinary( ((DigestInputStream) body).getMessageDigest().digest()) .toLowerCase()); - eTag = ozoneOutputStream.getCommitUploadPartInfo().getPartName(); + keyOutputStream + = ozoneOutputStream.getKeyOutputStream(); } getMetrics().incPutKeySuccessLength(putLength); } + assert keyOutputStream != null; + OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo = + keyOutputStream.getCommitUploadPartInfo(); + String eTag = omMultipartCommitUploadPartInfo.getPartName(); + if (copyHeader != null) { getMetrics().updateCopyObjectSuccessStats(startNanos); return Response.ok(new CopyPartResult(eTag)).build(); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java index f7356e241fd9..590689b2b77a 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java @@ -22,9 +22,11 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; import org.apache.hadoop.ozone.client.io.KeyMetadataAware; import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.s3.exception.OS3Exception; import org.apache.hadoop.ozone.s3.exception.S3ErrorTable; import org.apache.hadoop.ozone.s3.metrics.S3GatewayMetrics; @@ -146,16 +148,17 @@ public static Response createMultipartKey(OzoneBucket ozoneBucket, String key, throws IOException, OS3Exception { String eTag; S3GatewayMetrics metrics = S3GatewayMetrics.create(); + KeyDataStreamOutput keyDataStreamOutput = null; try { try (OzoneDataStreamOutput streamOutput = ozoneBucket .createMultipartStreamKey(key, length, partNumber, uploadID)) { long putLength = writeToStreamOutput(streamOutput, body, chunkSize, length); + eTag = DatatypeConverter.printHexBinary( + body.getMessageDigest().digest()).toLowerCase(); + ((KeyMetadataAware)streamOutput).getMetadata().put("ETag", eTag); metrics.incPutKeySuccessLength(putLength); - ((KeyMetadataAware)streamOutput).getMetadata().put("ETag", - DatatypeConverter.printHexBinary( - body.getMessageDigest().digest()).toLowerCase()); - eTag = streamOutput.getCommitUploadPartInfo().getPartName(); + keyDataStreamOutput = streamOutput.getKeyDataStreamOutput(); } } catch (OMException ex) { if (ex.getResult() == @@ -167,6 +170,12 @@ public static Response createMultipartKey(OzoneBucket ozoneBucket, String key, ozoneBucket.getName() + "/" + key); } throw ex; + } finally { + if (keyDataStreamOutput != null) { + OmMultipartCommitUploadPartInfo commitUploadPartInfo = + keyDataStreamOutput.getCommitUploadPartInfo(); + eTag = commitUploadPartInfo.getPartName(); + } } return Response.ok().header("ETag", eTag).build(); } From 24522a752e35f3037c3bb7dff67d5bd5c943cd1e Mon Sep 17 00:00:00 2001 From: xichen01 Date: Mon, 6 Nov 2023 18:03:37 +0800 Subject: [PATCH 12/17] Fix unit test --- .../java/org/apache/hadoop/ozone/client/TestOzoneClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java index d16d4a76402a..c7a09cd2ce03 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java @@ -242,7 +242,7 @@ public void testPutKeySizeMismatch() throws IOException { value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE, new HashMap<>()); out1.write(value.substring(0, value.length() - 1).getBytes(UTF_8)); - Assertions.assertThrows(IllegalArgumentException.class, out1::close, + Assertions.assertThrows(IllegalStateException.class, out1::close, "Expected IllegalArgumentException due to size mismatch."); // Simulating second mismatch: Write more data than expected @@ -251,7 +251,7 @@ public void testPutKeySizeMismatch() throws IOException { new HashMap<>()); value += "1"; out2.write(value.getBytes(UTF_8)); - Assertions.assertThrows(IllegalArgumentException.class, out2::close, + Assertions.assertThrows(IllegalStateException.class, out2::close, "Expected IllegalArgumentException due to size mismatch."); } finally { client.getProxy().setIsS3Request(false); From d806d1492e0e7b23aaa4d48f4ece39825530dda2 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Mon, 6 Nov 2023 20:06:27 +0800 Subject: [PATCH 13/17] Simplified some code --- .../apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index d4ab2ef9b76e..b479b48e9239 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -289,8 +289,7 @@ public Response put( .equals(headers.getHeaderString("x-amz-content-sha256"))) { body = new DigestInputStream(new SignedChunksInputStream(body), E_TAG_PROVIDER.get()); - length = Long.parseLong( - headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER)); + length = Long.parseLong(amzDecodedLength); } else { body = new DigestInputStream(body, E_TAG_PROVIDER.get()); } @@ -894,9 +893,9 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, volume.getName(), sourceBucket, sourceKey); String range = headers.getHeaderString(COPY_SOURCE_HEADER_RANGE); + RangeHeader rangeHeader = null; if (range != null) { - RangeHeader rangeHeader = - RangeHeaderParserUtil.parseRangeHeader(range, 0); + rangeHeader = RangeHeaderParserUtil.parseRangeHeader(range, 0); // When copy Range, the size of the target key is the // length specified by COPY_SOURCE_HEADER_RANGE. length = rangeHeader.getEndOffset() - @@ -918,8 +917,6 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, try (OzoneInputStream sourceObject = sourceKeyDetails.getContent()) { long copyLength; if (range != null) { - RangeHeader rangeHeader = - RangeHeaderParserUtil.parseRangeHeader(range, 0); final long skipped = sourceObject.skip(rangeHeader.getStartOffset()); if (skipped != rangeHeader.getStartOffset()) { From 5cc207692c9a988e6eb6b62c1095b26a40293bd0 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Mon, 6 Nov 2023 23:22:24 +0800 Subject: [PATCH 14/17] Fix test --- .../ozone/client/OzoneOutputStreamStub.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java index 326d388b88e9..00a7ba557490 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java @@ -20,6 +20,9 @@ package org.apache.hadoop.ozone.client; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; @@ -69,6 +72,18 @@ public synchronized void close() throws IOException { } } + @Override + public KeyOutputStream getKeyOutputStream() { + return new KeyOutputStream( + ReplicationConfig.getDefault(new OzoneConfiguration()), null) { + @Override + public synchronized OmMultipartCommitUploadPartInfo + getCommitUploadPartInfo() { + return OzoneOutputStreamStub.this.getCommitUploadPartInfo(); + } + }; + } + @Override public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { return closed ? new OmMultipartCommitUploadPartInfo(partName) : null; From 60072d241e26f8e74b8ff8be383f161184b67ad8 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Tue, 7 Nov 2023 00:32:27 +0800 Subject: [PATCH 15/17] Support for EC key commit checking --- .../hadoop/ozone/client/io/ECKeyOutputStream.java | 15 +++++++++++++++ .../hadoop/ozone/client/io/KeyOutputStream.java | 4 ++++ 2 files changed, 19 insertions(+) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index 242b2606f8fc..15ebccda2886 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -78,6 +78,14 @@ public final class ECKeyOutputStream extends KeyOutputStream private final Future flushFuture; private final AtomicLong flushCheckpoint; + /** + * Indicates if an atomic write is required. When set to true, + * the amount of data written must match the declared size during the commit. + * A mismatch will prevent the commit from succeeding. + * This is essential for operations like S3 put to ensure atomicity. + */ + private boolean atomicKeyCreation; + private enum StripeWriteStatus { SUCCESS, FAILED @@ -155,6 +163,7 @@ private ECKeyOutputStream(Builder builder) { flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth)); this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue); this.flushCheckpoint = new AtomicLong(0); + this.atomicKeyCreation = builder.getAtomicKeyCreation(); } /** @@ -512,6 +521,12 @@ public void close() throws IOException { Preconditions.checkArgument(writeOffset == offset, "Expected writeOffset= " + writeOffset + " Expected offset=" + offset); + if (atomicKeyCreation) { + long expectedSize = blockOutputStreamEntryPool.getDataSize(); + Preconditions.checkState(expectedSize == offset, String.format( + "Expected: %d and actual %d write sizes do not match", + expectedSize, offset)); + } blockOutputStreamEntryPool.commitKey(offset); } } catch (ExecutionException e) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index d80a7cc797a1..4e0c4c91faed 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -708,6 +708,10 @@ public ContainerClientMetrics getClientMetrics() { return clientMetrics; } + public boolean getAtomicKeyCreation() { + return atomicKeyCreation; + } + public KeyOutputStream build() { return new KeyOutputStream( clientConfig, From f0fe1ee01dcce2ab5cdf804d0d5bd2759b76a234 Mon Sep 17 00:00:00 2001 From: xichen01 Date: Tue, 7 Nov 2023 12:04:01 +0800 Subject: [PATCH 16/17] Empty commit message From d6a89bd902725e9d74da6f6844c75c2a401e9dec Mon Sep 17 00:00:00 2001 From: xichen01 Date: Wed, 8 Nov 2023 11:22:17 +0800 Subject: [PATCH 17/17] Add some comments --- .../java/org/apache/hadoop/ozone/client/rpc/RpcClient.java | 4 ++++ .../org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 4 ++++ .../hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java | 4 ++++ 3 files changed, 12 insertions(+) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 1cb81bd47f1a..5d70bdfb86ae 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -1348,6 +1348,10 @@ public OzoneOutputStream createKey( .setLatestVersionLocation(getLatestVersionLocation); OpenKeySession openKey = ozoneManagerClient.openKey(builder.build()); + // For bucket with layout OBJECT_STORE, when create an empty file (size=0), + // OM will set DataSize to OzoneConfigKeys#OZONE_SCM_BLOCK_SIZE, + // which will cause S3G's atomic write length check to fail, + // so reset size to 0 here. if (isS3GRequest.get() && size == 0) { openKey.getKeyInfo().setDataSize(size); } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index b479b48e9239..d85a628ea3e3 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -883,6 +883,10 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, .createMultipartKey(ozoneBucket, key, length, partNumber, uploadID, chunkSize, (DigestInputStream) body); } + // OmMultipartCommitUploadPartInfo can only be gotten after the + // OzoneOutputStream is closed, so we need to save the KeyOutputStream + // in the OzoneOutputStream and use it to get the + // OmMultipartCommitUploadPartInfo after OzoneOutputStream is closed. KeyOutputStream keyOutputStream = null; if (copyHeader != null) { Pair result = parseSourceHeader(copyHeader); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java index 590689b2b77a..b536b3248b8e 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java @@ -148,6 +148,10 @@ public static Response createMultipartKey(OzoneBucket ozoneBucket, String key, throws IOException, OS3Exception { String eTag; S3GatewayMetrics metrics = S3GatewayMetrics.create(); + // OmMultipartCommitUploadPartInfo can only be gotten after the + // OzoneDataStreamOutput is closed, so we need to save the + // KeyDataStreamOutput in the OzoneDataStreamOutput and use it to get the + // OmMultipartCommitUploadPartInfo after OzoneDataStreamOutput is closed. KeyDataStreamOutput keyDataStreamOutput = null; try { try (OzoneDataStreamOutput streamOutput = ozoneBucket