diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java index 10b16f800d90..d4bccd55350b 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java @@ -289,6 +289,10 @@ long computeBufferData() { return totalDataLen; } + public long getDataSize() { + return keyArgs.getDataSize(); + } + @Override public Map getMetadata() { return this.keyArgs.getMetadata(); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index 7b0259e379d5..65c1cd4caa2c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -438,4 +438,7 @@ public Map getMetadata() { return null; } + long getDataSize() { + return keyArgs.getDataSize(); + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index 242b2606f8fc..15ebccda2886 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -78,6 +78,14 @@ public final class ECKeyOutputStream extends KeyOutputStream private final Future flushFuture; private final AtomicLong flushCheckpoint; + /** + * Indicates if an atomic write is required. When set to true, + * the amount of data written must match the declared size during the commit. + * A mismatch will prevent the commit from succeeding. + * This is essential for operations like S3 put to ensure atomicity. + */ + private boolean atomicKeyCreation; + private enum StripeWriteStatus { SUCCESS, FAILED @@ -155,6 +163,7 @@ private ECKeyOutputStream(Builder builder) { flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth)); this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue); this.flushCheckpoint = new AtomicLong(0); + this.atomicKeyCreation = builder.getAtomicKeyCreation(); } /** @@ -512,6 +521,12 @@ public void close() throws IOException { Preconditions.checkArgument(writeOffset == offset, "Expected writeOffset= " + writeOffset + " Expected offset=" + offset); + if (atomicKeyCreation) { + long expectedSize = blockOutputStreamEntryPool.getDataSize(); + Preconditions.checkState(expectedSize == offset, String.format( + "Expected: %d and actual %d write sizes do not match", + expectedSize, offset)); + } blockOutputStreamEntryPool.commitKey(offset); } } catch (ExecutionException e) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index a6331151e303..2368cd78e9aa 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -81,6 +81,14 @@ enum StreamAction { private long clientID; + /** + * Indicates if an atomic write is required. When set to true, + * the amount of data written must match the declared size during the commit. + * A mismatch will prevent the commit from succeeding. + * This is essential for operations like S3 put to ensure atomicity. + */ + private boolean atomicKeyCreation; + @VisibleForTesting public List getStreamEntries() { return blockDataStreamOutputEntryPool.getStreamEntries(); @@ -109,7 +117,8 @@ public KeyDataStreamOutput( OzoneManagerProtocol omClient, int chunkSize, String requestId, ReplicationConfig replicationConfig, String uploadID, int partNumber, boolean isMultipart, - boolean unsafeByteBufferConversion + boolean unsafeByteBufferConversion, + boolean atomicKeyCreation ) { super(HddsClientUtils.getRetryPolicyByException( config.getMaxRetryCount(), config.getRetryInterval())); @@ -130,6 +139,7 @@ public KeyDataStreamOutput( // encrypted bucket. this.writeOffset = 0; this.clientID = handler.getId(); + this.atomicKeyCreation = atomicKeyCreation; } /** @@ -387,6 +397,12 @@ public void close() throws IOException { if (!isException()) { Preconditions.checkArgument(writeOffset == offset); } + if (atomicKeyCreation) { + long expectedSize = blockDataStreamOutputEntryPool.getDataSize(); + Preconditions.checkArgument(expectedSize == offset, + String.format("Expected: %d and actual %d write sizes do not match", + expectedSize, offset)); + } blockDataStreamOutputEntryPool.commitKey(offset); } finally { blockDataStreamOutputEntryPool.cleanup(); @@ -422,6 +438,7 @@ public static class Builder { private boolean unsafeByteBufferConversion; private OzoneClientConfig clientConfig; private ReplicationConfig replicationConfig; + private boolean atomicKeyCreation = false; public Builder setMultipartUploadID(String uploadID) { this.multipartUploadID = uploadID; @@ -474,6 +491,11 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) { return this; } + public Builder setAtomicKeyCreation(boolean atomicKey) { + this.atomicKeyCreation = atomicKey; + return this; + } + public KeyDataStreamOutput build() { return new KeyDataStreamOutput( clientConfig, @@ -486,7 +508,8 @@ public KeyDataStreamOutput build() { multipartUploadID, multipartNumber, isMultipartKey, - unsafeByteBufferConversion); + unsafeByteBufferConversion, + atomicKeyCreation); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index fa23f885443b..4e0c4c91faed 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -95,6 +95,14 @@ enum StreamAction { private long clientID; + /** + * Indicates if an atomic write is required. When set to true, + * the amount of data written must match the declared size during the commit. + * A mismatch will prevent the commit from succeeding. + * This is essential for operations like S3 put to ensure atomicity. + */ + private boolean atomicKeyCreation; + public KeyOutputStream(ReplicationConfig replicationConfig, ContainerClientMetrics clientMetrics) { this.replication = replicationConfig; @@ -142,7 +150,8 @@ public KeyOutputStream( String requestId, ReplicationConfig replicationConfig, String uploadID, int partNumber, boolean isMultipart, boolean unsafeByteBufferConversion, - ContainerClientMetrics clientMetrics + ContainerClientMetrics clientMetrics, + boolean atomicKeyCreation ) { this.config = config; this.replication = replicationConfig; @@ -163,6 +172,7 @@ public KeyOutputStream( this.isException = false; this.writeOffset = 0; this.clientID = handler.getId(); + this.atomicKeyCreation = atomicKeyCreation; } /** @@ -555,6 +565,12 @@ public synchronized void close() throws IOException { if (!isException) { Preconditions.checkArgument(writeOffset == offset); } + if (atomicKeyCreation) { + long expectedSize = blockOutputStreamEntryPool.getDataSize(); + Preconditions.checkState(expectedSize == offset, + String.format("Expected: %d and actual %d write sizes do not match", + expectedSize, offset)); + } blockOutputStreamEntryPool.commitKey(offset); } finally { blockOutputStreamEntryPool.cleanup(); @@ -591,6 +607,7 @@ public static class Builder { private OzoneClientConfig clientConfig; private ReplicationConfig replicationConfig; private ContainerClientMetrics clientMetrics; + private boolean atomicKeyCreation = false; public String getMultipartUploadID() { return multipartUploadID; @@ -677,6 +694,11 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) { return this; } + public Builder setAtomicKeyCreation(boolean atomicKey) { + this.atomicKeyCreation = atomicKey; + return this; + } + public Builder setClientMetrics(ContainerClientMetrics clientMetrics) { this.clientMetrics = clientMetrics; return this; @@ -686,6 +708,10 @@ public ContainerClientMetrics getClientMetrics() { return clientMetrics; } + public boolean getAtomicKeyCreation() { + return atomicKeyCreation; + } + public KeyOutputStream build() { return new KeyOutputStream( clientConfig, @@ -698,7 +724,8 @@ public KeyOutputStream build() { multipartNumber, isMultipartKey, unsafeByteBufferConversion, - clientMetrics); + clientMetrics, + atomicKeyCreation); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java index d8cb06ecccf0..c0af1c530103 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java @@ -59,30 +59,35 @@ public synchronized void close() throws IOException { } public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { + KeyDataStreamOutput keyDataStreamOutput = getKeyDataStreamOutput(); + if (keyDataStreamOutput != null) { + return keyDataStreamOutput.getCommitUploadPartInfo(); + } + // Otherwise return null. + return null; + } + + public KeyDataStreamOutput getKeyDataStreamOutput() { if (byteBufferStreamOutput instanceof OzoneOutputStream) { OutputStream outputStream = ((OzoneOutputStream) byteBufferStreamOutput).getOutputStream(); if (outputStream instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) - outputStream).getCommitUploadPartInfo(); + return ((KeyDataStreamOutput) outputStream); } else if (outputStream instanceof CryptoOutputStream) { OutputStream wrappedStream = ((CryptoOutputStream) outputStream).getWrappedStream(); if (wrappedStream instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) wrappedStream) - .getCommitUploadPartInfo(); + return ((KeyDataStreamOutput) wrappedStream); } } else if (outputStream instanceof CipherOutputStreamOzone) { OutputStream wrappedStream = ((CipherOutputStreamOzone) outputStream).getWrappedStream(); if (wrappedStream instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) wrappedStream) - .getCommitUploadPartInfo(); + return ((KeyDataStreamOutput) wrappedStream); } } } else if (byteBufferStreamOutput instanceof KeyDataStreamOutput) { - return ((KeyDataStreamOutput) - byteBufferStreamOutput).getCommitUploadPartInfo(); + return ((KeyDataStreamOutput) byteBufferStreamOutput); } // Otherwise return null. return null; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java index 093b31b7a52e..bd056185e754 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java @@ -123,29 +123,38 @@ public void hsync() throws IOException { } public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { + KeyOutputStream keyOutputStream = getKeyOutputStream(); + if (keyOutputStream != null) { + return keyOutputStream.getCommitUploadPartInfo(); + } + // Otherwise return null. + return null; + } + + public OutputStream getOutputStream() { + return outputStream; + } + + public KeyOutputStream getKeyOutputStream() { if (outputStream instanceof KeyOutputStream) { - return ((KeyOutputStream) outputStream).getCommitUploadPartInfo(); + return ((KeyOutputStream) outputStream); } else if (outputStream instanceof CryptoOutputStream) { OutputStream wrappedStream = ((CryptoOutputStream) outputStream).getWrappedStream(); if (wrappedStream instanceof KeyOutputStream) { - return ((KeyOutputStream) wrappedStream).getCommitUploadPartInfo(); + return ((KeyOutputStream) wrappedStream); } } else if (outputStream instanceof CipherOutputStreamOzone) { OutputStream wrappedStream = ((CipherOutputStreamOzone) outputStream).getWrappedStream(); if (wrappedStream instanceof KeyOutputStream) { - return ((KeyOutputStream)wrappedStream).getCommitUploadPartInfo(); + return ((KeyOutputStream)wrappedStream); } } // Otherwise return null. return null; } - public OutputStream getOutputStream() { - return outputStream; - } - @Override public Map getMetadata() { if (outputStream instanceof CryptoOutputStream) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java index b45a3209f418..bc01d6653ba9 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java @@ -1008,6 +1008,9 @@ OzoneKey headObject(String volumeName, String bucketName, */ void setThreadLocalS3Auth(S3Auth s3Auth); + + void setIsS3Request(boolean isS3Request); + /** * Gets the S3 Authentication information that is attached to the thread. * @return S3 Authentication information. diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index ad8ced95d121..5d70bdfb86ae 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -160,6 +160,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; @@ -212,6 +213,7 @@ public class RpcClient implements ClientProtocol { private final OzoneManagerVersion omVersion; private volatile ExecutorService ecReconstructExecutor; private final ContainerClientMetrics clientMetrics; + private final AtomicBoolean isS3GRequest = new AtomicBoolean(false); /** * Creates RpcClient instance with the given configuration. @@ -687,7 +689,7 @@ public void createBucket( : "with server-side default bucket layout"; LOG.info("Creating Bucket: {}/{}, {}, {} as owner, Versioning {}, " + "Storage Type set to {} and Encryption set to {}, " + - "Replication Type set to {}, Namespace Quota set to {}, " + + "Replication Type set to {}, Namespace Quota set to {}, " + "Space Quota set to {} ", volumeName, bucketName, layoutMsg, owner, isVersionEnabled, storageType, bek != null, replicationType, @@ -1346,6 +1348,13 @@ public OzoneOutputStream createKey( .setLatestVersionLocation(getLatestVersionLocation); OpenKeySession openKey = ozoneManagerClient.openKey(builder.build()); + // For bucket with layout OBJECT_STORE, when create an empty file (size=0), + // OM will set DataSize to OzoneConfigKeys#OZONE_SCM_BLOCK_SIZE, + // which will cause S3G's atomic write length check to fail, + // so reset size to 0 here. + if (isS3GRequest.get() && size == 0) { + openKey.getKeyInfo().setDataSize(size); + } return createOutputStream(openKey); } @@ -1787,6 +1796,7 @@ public OzoneOutputStream createMultipartKey( .setMultipartNumber(partNumber) .setMultipartUploadID(uploadID) .setIsMultipartKey(true) + .setAtomicKeyCreation(isS3GRequest.get()) .build(); return createOutputStream(openKey, keyOutputStream); } @@ -1802,7 +1812,9 @@ public OzoneDataStreamOutput createMultipartStreamKey( throws IOException { final OpenKeySession openKey = newMultipartOpenKey( volumeName, bucketName, keyName, size, partNumber, uploadID); - + // Amazon S3 never adds partial objects, So for S3 requests we need to + // set atomicKeyCreation to true + // refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html KeyDataStreamOutput keyOutputStream = new KeyDataStreamOutput.Builder() .setHandler(openKey) @@ -1814,6 +1826,7 @@ public OzoneDataStreamOutput createMultipartStreamKey( .setIsMultipartKey(true) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .setConfig(conf.getObject(OzoneClientConfig.class)) + .setAtomicKeyCreation(isS3GRequest.get()) .build(); keyOutputStream .addPreallocateBlocks( @@ -2216,6 +2229,9 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey) throws IOException { final ReplicationConfig replicationConfig = openKey.getKeyInfo().getReplicationConfig(); + // Amazon S3 never adds partial objects, So for S3 requests we need to + // set atomicKeyCreation to true + // refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html KeyDataStreamOutput keyOutputStream = new KeyDataStreamOutput.Builder() .setHandler(openKey) @@ -2224,6 +2240,7 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey) .setReplicationConfig(replicationConfig) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .setConfig(conf.getObject(OzoneClientConfig.class)) + .setAtomicKeyCreation(isS3GRequest.get()) .build(); keyOutputStream .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(), @@ -2305,6 +2322,7 @@ private KeyOutputStream.Builder createKeyOutputStream( .setOmClient(ozoneManagerClient) .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .setConfig(conf.getObject(OzoneClientConfig.class)) + .setAtomicKeyCreation(isS3GRequest.get()) .setClientMetrics(clientMetrics); } @@ -2383,6 +2401,11 @@ public void setThreadLocalS3Auth( ozoneManagerClient.setThreadLocalS3Auth(ozoneSharedSecretAuth); } + @Override + public void setIsS3Request(boolean s3Request) { + this.isS3GRequest.set(s3Request); + } + @Override public S3Auth getThreadLocalS3Auth() { return ozoneManagerClient.getThreadLocalS3Auth(); diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java index 542b8a8a9eca..c7a09cd2ce03 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java @@ -224,6 +224,40 @@ public void testPutKeyWithECReplicationConfig() throws IOException { } } + /** + * This test validates that for S3G, + * the key upload process needs to be atomic. + * It simulates two mismatch scenarios where the actual write data size does + * not match the expected size. + */ + @Test + public void testPutKeySizeMismatch() throws IOException { + String value = new String(new byte[1024], UTF_8); + OzoneBucket bucket = getOzoneBucket(); + String keyName = UUID.randomUUID().toString(); + try { + // Simulating first mismatch: Write less data than expected + client.getProxy().setIsS3Request(true); + OzoneOutputStream out1 = bucket.createKey(keyName, + value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE, + new HashMap<>()); + out1.write(value.substring(0, value.length() - 1).getBytes(UTF_8)); + Assertions.assertThrows(IllegalStateException.class, out1::close, + "Expected IllegalArgumentException due to size mismatch."); + + // Simulating second mismatch: Write more data than expected + OzoneOutputStream out2 = bucket.createKey(keyName, + value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE, + new HashMap<>()); + value += "1"; + out2.write(value.getBytes(UTF_8)); + Assertions.assertThrows(IllegalStateException.class, out2::close, + "Expected IllegalArgumentException due to size mismatch."); + } finally { + client.getProxy().setIsS3Request(false); + } + } + private OzoneBucket getOzoneBucket() throws IOException { String volumeName = UUID.randomUUID().toString(); String bucketName = UUID.randomUUID().toString(); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java index 9473467b8b98..05b7a62c0623 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java @@ -127,9 +127,10 @@ public void initialization() { signatureInfo.getSignature(), signatureInfo.getAwsAccessId(), signatureInfo.getAwsAccessId()); LOG.debug("S3 access id: {}", s3Auth.getAccessID()); - getClient().getObjectStore() - .getClientProxy() - .setThreadLocalS3Auth(s3Auth); + ClientProtocol clientProtocol = + getClient().getObjectStore().getClientProxy(); + clientProtocol.setThreadLocalS3Auth(s3Auth); + clientProtocol.setIsS3Request(true); init(); } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 9503c53cfd5f..d85a628ea3e3 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -59,6 +59,7 @@ import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts; import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.io.KeyMetadataAware; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.exceptions.OMException; @@ -123,6 +124,7 @@ import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.PRECOND_FAILED; import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError; import static org.apache.hadoop.ozone.s3.util.S3Consts.ACCEPT_RANGE_HEADER; +import static org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.CONTENT_RANGE_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER_RANGE; @@ -205,6 +207,7 @@ public void init() { * See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html for * more details. */ + @SuppressWarnings("checkstyle:MethodLength") @PUT public Response put( @PathParam("bucket") String bucketName, @@ -217,9 +220,6 @@ public Response put( S3GAction s3GAction = S3GAction.CREATE_KEY; boolean auditSuccess = true; - - OzoneOutputStream output = null; - String copyHeader = null, storageType = null; try { OzoneVolume volume = getVolume(); @@ -289,6 +289,7 @@ public Response put( .equals(headers.getHeaderString("x-amz-content-sha256"))) { body = new DigestInputStream(new SignedChunksInputStream(body), E_TAG_PROVIDER.get()); + length = Long.parseLong(amzDecodedLength); } else { body = new DigestInputStream(body, E_TAG_PROVIDER.get()); } @@ -303,14 +304,16 @@ public Response put( eTag = keyWriteResult.getKey(); putLength = keyWriteResult.getValue(); } else { - output = getClientProtocol().createKey(volume.getName(), bucketName, - keyPath, length, replicationConfig, customMetadata); - getMetrics().updatePutKeyMetadataStats(startNanos); - putLength = IOUtils.copyLarge(body, output); - eTag = DatatypeConverter.printHexBinary( - ((DigestInputStream) body).getMessageDigest().digest()) - .toLowerCase(); - output.getMetadata().put(ETAG, eTag); + try (OzoneOutputStream output = getClientProtocol().createKey( + volume.getName(), bucketName, keyPath, length, replicationConfig, + customMetadata)) { + getMetrics().updatePutKeyMetadataStats(startNanos); + putLength = IOUtils.copyLarge(body, output); + eTag = DatatypeConverter.printHexBinary( + ((DigestInputStream) body).getMessageDigest().digest()) + .toLowerCase(); + output.getMetadata().put(ETAG, eTag); + } } getMetrics().incPutKeySuccessLength(putLength); @@ -352,9 +355,6 @@ public Response put( } throw ex; } finally { - if (output != null) { - output.close(); - } if (auditSuccess) { AUDIT.logWriteSuccess( buildAuditMessageForSuccess(s3GAction, getAuditParameters())); @@ -845,6 +845,7 @@ public Response completeMultipartUpload(@PathParam("bucket") String bucket, } } + @SuppressWarnings("checkstyle:MethodLength") private Response createMultipartKey(OzoneVolume volume, String bucket, String key, long length, int partNumber, String uploadID, InputStream body) @@ -852,12 +853,13 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, long startNanos = Time.monotonicNowNanos(); String copyHeader = null; try { - OzoneOutputStream ozoneOutputStream = null; if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD" .equals(headers.getHeaderString("x-amz-content-sha256"))) { body = new DigestInputStream(new SignedChunksInputStream(body), E_TAG_PROVIDER.get()); + length = Long.parseLong( + headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER)); } else { body = new DigestInputStream(body, E_TAG_PROVIDER.get()); } @@ -875,78 +877,96 @@ private Response createMultipartKey(OzoneVolume volume, String bucket, enableEC = true; } - try { - if (datastreamEnabled && !enableEC && copyHeader == null) { - getMetrics().updatePutKeyMetadataStats(startNanos); - return ObjectEndpointStreaming - .createMultipartKey(ozoneBucket, key, length, partNumber, - uploadID, chunkSize, (DigestInputStream) body); + if (datastreamEnabled && !enableEC && copyHeader == null) { + getMetrics().updatePutKeyMetadataStats(startNanos); + return ObjectEndpointStreaming + .createMultipartKey(ozoneBucket, key, length, partNumber, + uploadID, chunkSize, (DigestInputStream) body); + } + // OmMultipartCommitUploadPartInfo can only be gotten after the + // OzoneOutputStream is closed, so we need to save the KeyOutputStream + // in the OzoneOutputStream and use it to get the + // OmMultipartCommitUploadPartInfo after OzoneOutputStream is closed. + KeyOutputStream keyOutputStream = null; + if (copyHeader != null) { + Pair result = parseSourceHeader(copyHeader); + String sourceBucket = result.getLeft(); + String sourceKey = result.getRight(); + + OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails( + volume.getName(), sourceBucket, sourceKey); + String range = + headers.getHeaderString(COPY_SOURCE_HEADER_RANGE); + RangeHeader rangeHeader = null; + if (range != null) { + rangeHeader = RangeHeaderParserUtil.parseRangeHeader(range, 0); + // When copy Range, the size of the target key is the + // length specified by COPY_SOURCE_HEADER_RANGE. + length = rangeHeader.getEndOffset() - + rangeHeader.getStartOffset() + 1; + } else { + length = sourceKeyDetails.getDataSize(); + } + Long sourceKeyModificationTime = sourceKeyDetails + .getModificationTime().toEpochMilli(); + String copySourceIfModifiedSince = + headers.getHeaderString(COPY_SOURCE_IF_MODIFIED_SINCE); + String copySourceIfUnmodifiedSince = + headers.getHeaderString(COPY_SOURCE_IF_UNMODIFIED_SINCE); + if (!checkCopySourceModificationTime(sourceKeyModificationTime, + copySourceIfModifiedSince, copySourceIfUnmodifiedSince)) { + throw newError(PRECOND_FAILED, sourceBucket + "/" + sourceKey); } - ozoneOutputStream = getClientProtocol().createMultipartKey( - volume.getName(), bucket, key, length, partNumber, uploadID); - - if (copyHeader != null) { - Pair result = parseSourceHeader(copyHeader); - - String sourceBucket = result.getLeft(); - String sourceKey = result.getRight(); - - OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails( - volume.getName(), sourceBucket, sourceKey); - Long sourceKeyModificationTime = sourceKeyDetails - .getModificationTime().toEpochMilli(); - String copySourceIfModifiedSince = - headers.getHeaderString(COPY_SOURCE_IF_MODIFIED_SINCE); - String copySourceIfUnmodifiedSince = - headers.getHeaderString(COPY_SOURCE_IF_UNMODIFIED_SINCE); - if (!checkCopySourceModificationTime(sourceKeyModificationTime, - copySourceIfModifiedSince, copySourceIfUnmodifiedSince)) { - throw newError(PRECOND_FAILED, sourceBucket + "/" + sourceKey); - } - try (OzoneInputStream sourceObject = sourceKeyDetails.getContent()) { - - String range = - headers.getHeaderString(COPY_SOURCE_HEADER_RANGE); - long copyLength; - if (range != null) { - RangeHeader rangeHeader = - RangeHeaderParserUtil.parseRangeHeader(range, 0); - final long skipped = - sourceObject.skip(rangeHeader.getStartOffset()); - if (skipped != rangeHeader.getStartOffset()) { - throw new EOFException( - "Bytes to skip: " - + rangeHeader.getStartOffset() + " actual: " + skipped); - } + try (OzoneInputStream sourceObject = sourceKeyDetails.getContent()) { + long copyLength; + if (range != null) { + final long skipped = + sourceObject.skip(rangeHeader.getStartOffset()); + if (skipped != rangeHeader.getStartOffset()) { + throw new EOFException( + "Bytes to skip: " + + rangeHeader.getStartOffset() + " actual: " + skipped); + } + try (OzoneOutputStream ozoneOutputStream = getClientProtocol() + .createMultipartKey(volume.getName(), bucket, key, length, + partNumber, uploadID)) { getMetrics().updateCopyKeyMetadataStats(startNanos); - copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream, 0, - rangeHeader.getEndOffset() - rangeHeader.getStartOffset() - + 1); - } else { + copyLength = IOUtils.copyLarge( + sourceObject, ozoneOutputStream, 0, length); + keyOutputStream = ozoneOutputStream.getKeyOutputStream(); + } + } else { + try (OzoneOutputStream ozoneOutputStream = getClientProtocol() + .createMultipartKey(volume.getName(), bucket, key, length, + partNumber, uploadID)) { getMetrics().updateCopyKeyMetadataStats(startNanos); copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream); + keyOutputStream = ozoneOutputStream.getKeyOutputStream(); } - getMetrics().incCopyObjectSuccessLength(copyLength); } - } else { + getMetrics().incCopyObjectSuccessLength(copyLength); + } + } else { + long putLength; + try (OzoneOutputStream ozoneOutputStream = getClientProtocol() + .createMultipartKey(volume.getName(), bucket, key, length, + partNumber, uploadID)) { getMetrics().updatePutKeyMetadataStats(startNanos); - long putLength = IOUtils.copyLarge(body, ozoneOutputStream); + putLength = IOUtils.copyLarge(body, ozoneOutputStream); ((KeyMetadataAware)ozoneOutputStream.getOutputStream()) - .getMetadata().put("ETag", DatatypeConverter.printHexBinary( - ((DigestInputStream) body).getMessageDigest().digest()) + .getMetadata().put(ETAG, DatatypeConverter.printHexBinary( + ((DigestInputStream) body).getMessageDigest().digest()) .toLowerCase()); - getMetrics().incPutKeySuccessLength(putLength); - } - } finally { - if (ozoneOutputStream != null) { - ozoneOutputStream.close(); + keyOutputStream + = ozoneOutputStream.getKeyOutputStream(); } + getMetrics().incPutKeySuccessLength(putLength); } - assert ozoneOutputStream != null; + assert keyOutputStream != null; OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo = - ozoneOutputStream.getCommitUploadPartInfo(); + keyOutputStream.getCommitUploadPartInfo(); String eTag = omMultipartCommitUploadPartInfo.getPartName(); if (copyHeader != null) { diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java index ef87ad450d1c..b536b3248b8e 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; import org.apache.hadoop.ozone.client.io.KeyMetadataAware; import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.om.exceptions.OMException; @@ -145,18 +146,24 @@ public static Response createMultipartKey(OzoneBucket ozoneBucket, String key, String uploadID, int chunkSize, DigestInputStream body) throws IOException, OS3Exception { - OzoneDataStreamOutput streamOutput = null; String eTag; S3GatewayMetrics metrics = S3GatewayMetrics.create(); + // OmMultipartCommitUploadPartInfo can only be gotten after the + // OzoneDataStreamOutput is closed, so we need to save the + // KeyDataStreamOutput in the OzoneDataStreamOutput and use it to get the + // OmMultipartCommitUploadPartInfo after OzoneDataStreamOutput is closed. + KeyDataStreamOutput keyDataStreamOutput = null; try { - streamOutput = ozoneBucket - .createMultipartStreamKey(key, length, partNumber, uploadID); - long putLength = - writeToStreamOutput(streamOutput, body, chunkSize, length); - eTag = DatatypeConverter.printHexBinary(body.getMessageDigest().digest()) - .toLowerCase(); - ((KeyMetadataAware)streamOutput).getMetadata().put("ETag", eTag); - metrics.incPutKeySuccessLength(putLength); + try (OzoneDataStreamOutput streamOutput = ozoneBucket + .createMultipartStreamKey(key, length, partNumber, uploadID)) { + long putLength = + writeToStreamOutput(streamOutput, body, chunkSize, length); + eTag = DatatypeConverter.printHexBinary( + body.getMessageDigest().digest()).toLowerCase(); + ((KeyMetadataAware)streamOutput).getMetadata().put("ETag", eTag); + metrics.incPutKeySuccessLength(putLength); + keyDataStreamOutput = streamOutput.getKeyDataStreamOutput(); + } } catch (OMException ex) { if (ex.getResult() == OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) { @@ -168,10 +175,9 @@ public static Response createMultipartKey(OzoneBucket ozoneBucket, String key, } throw ex; } finally { - if (streamOutput != null) { - streamOutput.close(); + if (keyDataStreamOutput != null) { OmMultipartCommitUploadPartInfo commitUploadPartInfo = - streamOutput.getCommitUploadPartInfo(); + keyDataStreamOutput.getCommitUploadPartInfo(); eTag = commitUploadPartInfo.getPartName(); } } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java index 5505688b26c1..71f478435079 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java @@ -577,6 +577,11 @@ public void setThreadLocalS3Auth(S3Auth s3Auth) { } + @Override + public void setIsS3Request(boolean isS3Request) { + + } + @Override public S3Auth getThreadLocalS3Auth() { return null; diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java index d546afe2c7c2..fad3386c61c4 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java @@ -512,7 +512,7 @@ public OzoneMultipartUploadPartListParts listParts(String key, if (partEntry.getKey() > partNumberMarker) { PartInfo partInfo = new PartInfo(partEntry.getKey(), partEntry.getValue().getPartName(), - partEntry.getValue().getContent().length, Time.now()); + Time.now(), partEntry.getValue().getContent().length); partInfoList.add(partInfo); count++; } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java index 326d388b88e9..00a7ba557490 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java @@ -20,6 +20,9 @@ package org.apache.hadoop.ozone.client; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; @@ -69,6 +72,18 @@ public synchronized void close() throws IOException { } } + @Override + public KeyOutputStream getKeyOutputStream() { + return new KeyOutputStream( + ReplicationConfig.getDefault(new OzoneConfiguration()), null) { + @Override + public synchronized OmMultipartCommitUploadPartInfo + getCommitUploadPartInfo() { + return OzoneOutputStreamStub.this.getCommitUploadPartInfo(); + } + }; + } + @Override public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { return closed ? new OmMultipartCommitUploadPartInfo(partName) : null; diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java index 7422806db743..7186ceb5578e 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java @@ -40,6 +40,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientStub; +import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts; import org.apache.hadoop.ozone.s3.endpoint.CompleteMultipartUploadRequest.Part; import org.apache.hadoop.ozone.s3.exception.OS3Exception; @@ -385,6 +386,28 @@ private Part uploadPartWithCopy(String key, String uploadID, int partNumber, return part; } + @Test + public void testUploadWithRangeCopyContentLength() + throws IOException, OS3Exception { + // The contentLength specified when creating the Key should be the same as + // the Content-Length, the key Commit will compare the Content-Length with + // the actual length of the data written. + + String uploadID = initiateMultipartUpload(KEY); + ByteArrayInputStream body = new ByteArrayInputStream("".getBytes(UTF_8)); + Map additionalHeaders = new HashMap<>(); + additionalHeaders.put(COPY_SOURCE_HEADER, + OzoneConsts.S3_BUCKET + "/" + EXISTING_KEY); + additionalHeaders.put(COPY_SOURCE_HEADER_RANGE, "bytes=0-3"); + setHeaders(additionalHeaders); + REST.put(OzoneConsts.S3_BUCKET, KEY, 0, 1, uploadID, body); + OzoneMultipartUploadPartListParts parts = + CLIENT.getObjectStore().getS3Bucket(OzoneConsts.S3_BUCKET) + .listParts(KEY, uploadID, 0, 100); + Assert.assertEquals(1, parts.getPartInfoList().size()); + Assert.assertEquals(4, parts.getPartInfoList().get(0).getSize()); + } + private void completeMultipartUpload(String key, CompleteMultipartUploadRequest completeMultipartUploadRequest, String uploadID) throws IOException, OS3Exception { diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java index 55c661084beb..4c1fd9a7aaf0 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java @@ -49,6 +49,7 @@ import org.mockito.Mockito; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Utils.urlEncode; @@ -140,6 +141,47 @@ public void testPutObjectWithECReplicationConfig() Assert.assertEquals(CONTENT, keyContent); } + @Test + public void testPutObjectContentLength() throws IOException, OS3Exception { + // The contentLength specified when creating the Key should be the same as + // the Content-Length, the key Commit will compare the Content-Length with + // the actual length of the data written. + HttpHeaders headers = Mockito.mock(HttpHeaders.class); + ByteArrayInputStream body = + new ByteArrayInputStream(CONTENT.getBytes(UTF_8)); + objectEndpoint.setHeaders(headers); + long dataSize = CONTENT.length(); + + objectEndpoint.put(bucketName, keyName, dataSize, 0, null, body); + Assert.assertEquals(dataSize, getKeyDataSize(keyName)); + } + + @Test + public void testPutObjectContentLengthForStreaming() + throws IOException, OS3Exception { + HttpHeaders headers = Mockito.mock(HttpHeaders.class); + objectEndpoint.setHeaders(headers); + + String chunkedContent = "0a;chunk-signature=signature\r\n" + + "1234567890\r\n" + + "05;chunk-signature=signature\r\n" + + "abcde\r\n"; + + when(headers.getHeaderString("x-amz-content-sha256")) + .thenReturn("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"); + + when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER)) + .thenReturn("15"); + objectEndpoint.put(bucketName, keyName, chunkedContent.length(), 0, null, + new ByteArrayInputStream(chunkedContent.getBytes(UTF_8))); + Assert.assertEquals(15, getKeyDataSize(keyName)); + } + + private long getKeyDataSize(String key) throws IOException { + return clientStub.getObjectStore().getS3Bucket(bucketName) + .getKey(key).getDataSize(); + } + @Test public void testPutObjectWithSignedChunks() throws IOException, OS3Exception { //GIVEN @@ -153,6 +195,8 @@ public void testPutObjectWithSignedChunks() throws IOException, OS3Exception { when(headers.getHeaderString("x-amz-content-sha256")) .thenReturn("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"); + when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER)) + .thenReturn("15"); //WHEN Response response = objectEndpoint.put(bucketName, keyName, diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java index 1b0e808f2d4d..6ba1b557ec3e 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java @@ -24,7 +24,9 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientStub; +import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts; import org.apache.hadoop.ozone.s3.exception.OS3Exception; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; @@ -33,9 +35,12 @@ import javax.ws.rs.core.Response; import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.UUID; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -49,11 +54,12 @@ public class TestPartUpload { private static final ObjectEndpoint REST = new ObjectEndpoint(); + private static OzoneClient client; @BeforeClass public static void setUp() throws Exception { - OzoneClient client = new OzoneClientStub(); + client = new OzoneClientStub(); client.getObjectStore().createS3Bucket(OzoneConsts.S3_BUCKET); @@ -135,4 +141,69 @@ public void testPartUploadWithIncorrectUploadID() throws Exception { assertEquals(HTTP_NOT_FOUND, ex.getHttpCode()); } } + + @Test + public void testPartUploadStreamContentLength() + throws IOException, OS3Exception { + HttpHeaders headers = Mockito.mock(HttpHeaders.class); + ObjectEndpoint objectEndpoint = new ObjectEndpoint(); + objectEndpoint.setHeaders(headers); + objectEndpoint.setClient(client); + objectEndpoint.setOzoneConfiguration(new OzoneConfiguration()); + String keyName = UUID.randomUUID().toString(); + + String chunkedContent = "0a;chunk-signature=signature\r\n" + + "1234567890\r\n" + + "05;chunk-signature=signature\r\n" + + "abcde\r\n"; + when(headers.getHeaderString("x-amz-content-sha256")) + .thenReturn("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"); + when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER)) + .thenReturn("15"); + + Response response = objectEndpoint.initializeMultipartUpload( + OzoneConsts.S3_BUCKET, keyName); + MultipartUploadInitiateResponse multipartUploadInitiateResponse = + (MultipartUploadInitiateResponse) response.getEntity(); + assertNotNull(multipartUploadInitiateResponse.getUploadID()); + String uploadID = multipartUploadInitiateResponse.getUploadID(); + long contentLength = chunkedContent.length(); + + objectEndpoint.put(OzoneConsts.S3_BUCKET, keyName, contentLength, 1, + uploadID, new ByteArrayInputStream(chunkedContent.getBytes(UTF_8))); + assertContentLength(uploadID, keyName, 15); + } + + @Test + public void testPartUploadContentLength() throws IOException, OS3Exception { + // The contentLength specified when creating the Key should be the same as + // the Content-Length, the key Commit will compare the Content-Length with + // the actual length of the data written. + + String keyName = UUID.randomUUID().toString(); + Response response = REST.initializeMultipartUpload(OzoneConsts.S3_BUCKET, + keyName); + MultipartUploadInitiateResponse multipartUploadInitiateResponse = + (MultipartUploadInitiateResponse) response.getEntity(); + assertNotNull(multipartUploadInitiateResponse.getUploadID()); + String uploadID = multipartUploadInitiateResponse.getUploadID(); + String content = "Multipart Upload"; + long contentLength = content.length(); + + ByteArrayInputStream body = + new ByteArrayInputStream(content.getBytes(UTF_8)); + REST.put(OzoneConsts.S3_BUCKET, keyName, + contentLength, 1, uploadID, body); + assertContentLength(uploadID, keyName, content.length()); + } + + private void assertContentLength(String uploadID, String key, + long contentLength) throws IOException { + OzoneMultipartUploadPartListParts parts = + client.getObjectStore().getS3Bucket(OzoneConsts.S3_BUCKET) + .listParts(key, uploadID, 0, 100); + Assert.assertEquals(1, parts.getPartInfoList().size()); + Assert.assertEquals(contentLength, + parts.getPartInfoList().get(0).getSize()); + } }