Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions hadoop-ozone/s3gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@
<artifactId>hdds-test-utils</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,14 @@ public Response put(
@HeaderParam("Content-Length") long length,
@QueryParam("partNumber") int partNumber,
@QueryParam("uploadId") @DefaultValue("") String uploadID,
InputStream body) throws IOException, OS3Exception {
final InputStream body) throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
S3GAction s3GAction = S3GAction.CREATE_KEY;
boolean auditSuccess = true;
PerformanceStringBuilder perf = new PerformanceStringBuilder();

String copyHeader = null, storageType = null;
DigestInputStream digestInputStream = null;
try {
OzoneVolume volume = getVolume();
if (uploadID != null && !uploadID.equals("")) {
Expand Down Expand Up @@ -297,11 +298,11 @@ public Response put(

if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
.equals(headers.getHeaderString("x-amz-content-sha256"))) {
body = new DigestInputStream(new SignedChunksInputStream(body),
E_TAG_PROVIDER.get());
digestInputStream = new DigestInputStream(new SignedChunksInputStream(body),
getMessageDigestInstance());
length = Long.parseLong(amzDecodedLength);
} else {
body = new DigestInputStream(body, E_TAG_PROVIDER.get());
digestInputStream = new DigestInputStream(body, getMessageDigestInstance());
}

long putLength;
Expand All @@ -310,7 +311,7 @@ public Response put(
perf.appendStreamMode();
Pair<String, Long> keyWriteResult = ObjectEndpointStreaming
.put(bucket, keyPath, length, replicationConfig, chunkSize,
customMetadata, (DigestInputStream) body, perf);
customMetadata, digestInputStream, perf);
eTag = keyWriteResult.getKey();
putLength = keyWriteResult.getValue();
} else {
Expand All @@ -320,9 +321,9 @@ public Response put(
long metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
putLength = IOUtils.copyLarge(body, output);
putLength = IOUtils.copyLarge(digestInputStream, output);
eTag = DatatypeConverter.printHexBinary(
((DigestInputStream) body).getMessageDigest().digest())
digestInputStream.getMessageDigest().digest())
.toLowerCase();
output.getMetadata().put(ETAG, eTag);
}
Expand Down Expand Up @@ -367,6 +368,11 @@ public Response put(
}
throw ex;
} finally {
// Reset the thread-local message digest instance in case of exception
// and MessageDigest#digest is never called
if (digestInputStream != null) {
digestInputStream.getMessageDigest().reset();
}
if (auditSuccess) {
long opLatencyNs = getMetrics().updateCreateKeySuccessStats(startNanos);
perf.appendOpLatencyNanos(opLatencyNs);
Expand Down Expand Up @@ -879,20 +885,21 @@ public Response completeMultipartUpload(@PathParam("bucket") String bucket,
@SuppressWarnings({"checkstyle:MethodLength", "checkstyle:ParameterNumber"})
private Response createMultipartKey(OzoneVolume volume, String bucket,
String key, long length, int partNumber, String uploadID,
InputStream body, PerformanceStringBuilder perf)
final InputStream body, PerformanceStringBuilder perf)
throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
String copyHeader = null;
DigestInputStream digestInputStream = null;
try {

if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
.equals(headers.getHeaderString("x-amz-content-sha256"))) {
body = new DigestInputStream(new SignedChunksInputStream(body),
E_TAG_PROVIDER.get());
digestInputStream = new DigestInputStream(new SignedChunksInputStream(body),
getMessageDigestInstance());
length = Long.parseLong(
headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER));
} else {
body = new DigestInputStream(body, E_TAG_PROVIDER.get());
digestInputStream = new DigestInputStream(body, getMessageDigestInstance());
}

copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
Expand All @@ -912,7 +919,7 @@ private Response createMultipartKey(OzoneVolume volume, String bucket,
perf.appendStreamMode();
return ObjectEndpointStreaming
.createMultipartKey(ozoneBucket, key, length, partNumber,
uploadID, chunkSize, (DigestInputStream) body, perf);
uploadID, chunkSize, digestInputStream, perf);
}
// OmMultipartCommitUploadPartInfo can only be gotten after the
// OzoneOutputStream is closed, so we need to save the KeyOutputStream
Expand Down Expand Up @@ -993,10 +1000,10 @@ private Response createMultipartKey(OzoneVolume volume, String bucket,
partNumber, uploadID)) {
metadataLatencyNs =
getMetrics().updatePutKeyMetadataStats(startNanos);
putLength = IOUtils.copyLarge(body, ozoneOutputStream);
putLength = IOUtils.copyLarge(digestInputStream, ozoneOutputStream);
((KeyMetadataAware)ozoneOutputStream.getOutputStream())
.getMetadata().put(ETAG, DatatypeConverter.printHexBinary(
((DigestInputStream) body).getMessageDigest().digest())
digestInputStream.getMessageDigest().digest())
.toLowerCase());
keyOutputStream
= ozoneOutputStream.getKeyOutputStream();
Expand Down Expand Up @@ -1042,6 +1049,12 @@ private Response createMultipartKey(OzoneVolume volume, String bucket,
throw os3Exception;
}
throw ex;
} finally {
// Reset the thread-local message digest instance in case of exception
// and MessageDigest#digest is never called
if (digestInputStream != null) {
digestInputStream.getMessageDigest().reset();
}
}
}

Expand Down Expand Up @@ -1122,21 +1135,20 @@ public void setContext(ContainerRequestContext context) {
}

@SuppressWarnings("checkstyle:ParameterNumber")
void copy(OzoneVolume volume, InputStream src, long srcKeyLen,
void copy(OzoneVolume volume, DigestInputStream src, long srcKeyLen,
String destKey, String destBucket,
ReplicationConfig replication,
Map<String, String> metadata,
PerformanceStringBuilder perf, long startNanos)
throws IOException {
long copyLength;
src = new DigestInputStream(src, E_TAG_PROVIDER.get());
if (datastreamEnabled && !(replication != null &&
replication.getReplicationType() == EC) &&
srcKeyLen > datastreamMinLength) {
perf.appendStreamMode();
copyLength = ObjectEndpointStreaming
.copyKeyWithStream(volume.getBucket(destBucket), destKey, srcKeyLen,
chunkSize, replication, metadata, (DigestInputStream) src, perf, startNanos);
chunkSize, replication, metadata, src, perf, startNanos);
} else {
try (OzoneOutputStream dest = getClientProtocol()
.createKey(volume.getName(), destBucket, destKey, srcKeyLen,
Expand All @@ -1145,9 +1157,7 @@ void copy(OzoneVolume volume, InputStream src, long srcKeyLen,
getMetrics().updateCopyKeyMetadataStats(startNanos);
perf.appendMetaLatencyNanos(metadataLatencyNs);
copyLength = IOUtils.copyLarge(src, dest);
String eTag = DatatypeConverter.printHexBinary(
((DigestInputStream) src).getMessageDigest().digest())
.toLowerCase();
String eTag = DatatypeConverter.printHexBinary(src.getMessageDigest().digest()).toLowerCase();
dest.getMetadata().put(ETAG, eTag);
}
}
Expand All @@ -1166,6 +1176,7 @@ private CopyObjectResponse copyObject(OzoneVolume volume,

String sourceBucket = result.getLeft();
String sourceKey = result.getRight();
DigestInputStream sourceDigestInputStream = null;
try {
OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails(
volume.getName(), sourceBucket, sourceKey);
Expand Down Expand Up @@ -1195,11 +1206,11 @@ private CopyObjectResponse copyObject(OzoneVolume volume,
}
}
long sourceKeyLen = sourceKeyDetails.getDataSize();

try (OzoneInputStream src = getClientProtocol().getKey(volume.getName(),
sourceBucket, sourceKey)) {
getMetrics().updateCopyKeyMetadataStats(startNanos);
copy(volume, src, sourceKeyLen, destkey, destBucket, replicationConfig,
sourceDigestInputStream = new DigestInputStream(src, getMessageDigestInstance());
copy(volume, sourceDigestInputStream, sourceKeyLen, destkey, destBucket, replicationConfig,
sourceKeyDetails.getMetadata(), perf, startNanos);
}

Expand All @@ -1221,6 +1232,12 @@ private CopyObjectResponse copyObject(OzoneVolume volume,
destBucket + "/" + destkey, ex);
}
throw ex;
} finally {
// Reset the thread-local message digest instance in case of exception
// and MessageDigest#digest is never called
if (sourceDigestInputStream != null) {
sourceDigestInputStream.getMessageDigest().reset();
}
}
}

Expand Down Expand Up @@ -1321,4 +1338,9 @@ private String wrapInQuotes(String value) {
return "\"" + value + "\"";
}

@VisibleForTesting
public MessageDigest getMessageDigestInstance() {
return E_TAG_PROVIDER.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.MessageDigest;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.commons.io.IOUtils;
Expand All @@ -46,6 +48,7 @@
import org.apache.http.HttpStatus;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER;
Expand All @@ -57,10 +60,13 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -88,7 +94,7 @@ public void setup() throws IOException {
clientStub.getObjectStore().createS3Bucket(destBucket);

// Create PutObject and setClient to OzoneClientStub
objectEndpoint = new ObjectEndpoint();
objectEndpoint = spy(new ObjectEndpoint());
objectEndpoint.setClient(clientStub);
objectEndpoint.setOzoneConfiguration(new OzoneConfiguration());
}
Expand Down Expand Up @@ -226,6 +232,31 @@ public void testPutObjectWithSignedChunks() throws IOException, OS3Exception {
assertTrue(StringUtils.isNotEmpty(keyDetails.getMetadata().get(OzoneConsts.ETAG)));
}

@Test
public void testPutObjectMessageDigestResetDuringException() throws OS3Exception {
MessageDigest messageDigest = mock(MessageDigest.class);
try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
// For example, EOFException during put-object due to client cancelling the operation before it completes
mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class)))
.thenThrow(IOException.class);
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);

HttpHeaders headers = mock(HttpHeaders.class);
ByteArrayInputStream body =
new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
objectEndpoint.setHeaders(headers);
try {
objectEndpoint.put(bucketName, keyName, CONTENT
.length(), 1, null, body);
fail("Should throw IOException");
} catch (IOException ignored) {
// Verify that the message digest is reset so that the instance can be reused for the
// next request in the same thread
verify(messageDigest, times(1)).reset();
}
}
}

@Test
public void testCopyObject() throws IOException, OS3Exception {
// Put object in to source bucket
Expand Down Expand Up @@ -314,6 +345,53 @@ public void testCopyObject() throws IOException, OS3Exception {
assertThat(e.getCode()).contains("NoSuchBucket");
}

@Test
public void testCopyObjectMessageDigestResetDuringException() throws IOException, OS3Exception {
// Put object in to source bucket
HttpHeaders headers = mock(HttpHeaders.class);
ByteArrayInputStream body =
new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
objectEndpoint.setHeaders(headers);
keyName = "sourceKey";

Response response = objectEndpoint.put(bucketName, keyName,
CONTENT.length(), 1, null, body);

OzoneInputStream ozoneInputStream = clientStub.getObjectStore()
.getS3Bucket(bucketName)
.readKey(keyName);

String keyContent = IOUtils.toString(ozoneInputStream, UTF_8);
OzoneKeyDetails keyDetails = clientStub.getObjectStore().getS3Bucket(bucketName).getKey(keyName);

assertEquals(200, response.getStatus());
assertEquals(CONTENT, keyContent);
assertNotNull(keyDetails.getMetadata());
assertTrue(StringUtils.isNotEmpty(keyDetails.getMetadata().get(OzoneConsts.ETAG)));

MessageDigest messageDigest = mock(MessageDigest.class);
try (MockedStatic<IOUtils> mocked = mockStatic(IOUtils.class)) {
// Add the mocked methods only during the copy request
when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest);
mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class)))
.thenThrow(IOException.class);

// Add copy header, and then call put
when(headers.getHeaderString(COPY_SOURCE_HEADER)).thenReturn(
bucketName + "/" + urlEncode(keyName));

try {
objectEndpoint.put(destBucket, destkey, CONTENT.length(), 1,
null, body);
fail("Should throw IOException");
} catch (IOException ignored) {
// Verify that the message digest is reset so that the instance can be reused for the
// next request in the same thread
verify(messageDigest, times(1)).reset();
}
}
}

@Test
public void testInvalidStorageType() throws IOException {
HttpHeaders headers = mock(HttpHeaders.class);
Expand Down
Loading