diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 5c02ae2cf7c9..fe51cf77f2e1 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -54,6 +54,7 @@ import java.util.OptionalLong; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -93,6 +94,13 @@ import org.apache.commons.lang3.tuple.Pair; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION_TYPE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT; @@ -141,6 +149,9 @@ public class ObjectEndpoint extends EndpointBase { https://docs.aws.amazon.com/de_de/AmazonS3/latest/API/API_GetObject.html */ private Map overrideQueryParameter; private int bufferSize; + private int chunkSize; + private boolean datastreamEnabled; + private long datastreamMinLength; public ObjectEndpoint() { overrideQueryParameter = ImmutableMap.builder() @@ -161,6 +172,16 @@ public void init() { bufferSize = (int) ozoneConfiguration.getStorageSize( OZONE_S3G_CLIENT_BUFFER_SIZE_KEY, OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT, StorageUnit.BYTES); + chunkSize = (int) ozoneConfiguration.getStorageSize( + OZONE_SCM_CHUNK_SIZE_KEY, + OZONE_SCM_CHUNK_SIZE_DEFAULT, + StorageUnit.BYTES); + datastreamEnabled = ozoneConfiguration.getBoolean( + DFS_CONTAINER_RATIS_DATASTREAM_ENABLED, + DFS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT); + datastreamMinLength = (long) ozoneConfiguration.getStorageSize( + OZONE_FS_DATASTREAM_AUTO_THRESHOLD, + OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT, StorageUnit.BYTES); } /** @@ -203,6 +224,13 @@ public Response put( ReplicationConfig replicationConfig = getReplicationConfig(bucket, storageType); + boolean enableEC = false; + if ((replicationConfig != null && + replicationConfig.getReplicationType() == EC) || + bucket.getReplicationConfig() instanceof ECReplicationConfig) { + enableEC = true; + } + if (copyHeader != null) { //Copy object, as copy source available. s3GAction = S3GAction.COPY_OBJECT; @@ -233,11 +261,19 @@ public Response put( .equals(headers.getHeaderString("x-amz-content-sha256"))) { body = new SignedChunksInputStream(body); } + long putLength = 0; + if (datastreamEnabled && !enableEC && length > datastreamMinLength) { + getMetrics().updatePutKeyMetadataStats(startNanos); + putLength = ObjectEndpointStreaming + .put(bucket, keyPath, length, replicationConfig, chunkSize, + customMetadata, body); + } else { + output = getClientProtocol().createKey(volume.getName(), bucketName, + keyPath, length, replicationConfig, customMetadata); + getMetrics().updatePutKeyMetadataStats(startNanos); + putLength = IOUtils.copyLarge(body, output); + } - output = getClientProtocol().createKey(volume.getName(), bucketName, - keyPath, length, replicationConfig, customMetadata); - getMetrics().updatePutKeyMetadataStats(startNanos); - long putLength = IOUtils.copyLarge(body, output); getMetrics().incPutKeySuccessLength(putLength); return Response.ok().status(HttpStatus.SC_OK) .build(); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java new file mode 100644 index 000000000000..cfe298446778 --- /dev/null +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.s3.endpoint; + +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.s3.exception.OS3Exception; +import org.apache.hadoop.ozone.s3.exception.S3ErrorTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Map; + +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS; +import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST; + +/** + * Key level rest endpoints for Streaming. + */ +final class ObjectEndpointStreaming { + + private static final Logger LOG = + LoggerFactory.getLogger(ObjectEndpointStreaming.class); + + private ObjectEndpointStreaming() { + } + + public static long put(OzoneBucket bucket, String keyPath, + long length, ReplicationConfig replicationConfig, + int chunkSize, Map keyMetadata, + InputStream body) + throws IOException, OS3Exception { + + try { + return putKeyWithStream(bucket, keyPath, + length, chunkSize, replicationConfig, keyMetadata, body); + } catch (IOException ex) { + LOG.error("Exception occurred in PutObject", ex); + if (ex instanceof OMException) { + if (((OMException) ex).getResult() == + OMException.ResultCodes.NOT_A_FILE) { + OS3Exception os3Exception = S3ErrorTable.newError(INVALID_REQUEST, + keyPath); + os3Exception.setErrorMessage("An error occurred (InvalidRequest) " + + "when calling the PutObject/MPU PartUpload operation: " + + OZONE_OM_ENABLE_FILESYSTEM_PATHS + " is enabled Keys are" + + " considered as Unix Paths. Path has Violated FS Semantics " + + "which caused put operation to fail."); + throw os3Exception; + } else if ((((OMException) ex).getResult() == + OMException.ResultCodes.PERMISSION_DENIED)) { + throw S3ErrorTable.newError(S3ErrorTable.ACCESS_DENIED, keyPath); + } + } + throw ex; + } + } + + public static long putKeyWithStream(OzoneBucket bucket, + String keyPath, + long length, + int bufferSize, + ReplicationConfig replicationConfig, + Map keyMetadata, + InputStream body) + throws IOException { + long writeLen = 0; + try (OzoneDataStreamOutput streamOutput = bucket.createStreamKey(keyPath, + length, replicationConfig, keyMetadata)) { + writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length); + } + return writeLen; + } + + private static long writeToStreamOutput(OzoneDataStreamOutput streamOutput, + InputStream body, int bufferSize, + long length) + throws IOException { + final byte[] buffer = new byte[bufferSize]; + long n = 0; + while (n < length) { + final int toRead = Math.toIntExact(Math.min(bufferSize, length - n)); + final int readLength = body.read(buffer, 0, toRead); + if (readLength == -1) { + break; + } + streamOutput.write(ByteBuffer.wrap(buffer, 0, readLength)); + n += readLength; + } + return n; + } +}