Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.OptionalLong;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand Down Expand Up @@ -93,6 +94,13 @@

import org.apache.commons.lang3.tuple.Pair;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION_TYPE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT;
Expand Down Expand Up @@ -141,6 +149,9 @@ public class ObjectEndpoint extends EndpointBase {
https://docs.aws.amazon.com/de_de/AmazonS3/latest/API/API_GetObject.html */
private Map<String, String> overrideQueryParameter;
private int bufferSize;
private int chunkSize;
private boolean datastreamEnabled;
private long datastreamMinLength;

public ObjectEndpoint() {
overrideQueryParameter = ImmutableMap.<String, String>builder()
Expand All @@ -161,6 +172,16 @@ public void init() {
bufferSize = (int) ozoneConfiguration.getStorageSize(
OZONE_S3G_CLIENT_BUFFER_SIZE_KEY,
OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT, StorageUnit.BYTES);
chunkSize = (int) ozoneConfiguration.getStorageSize(
OZONE_SCM_CHUNK_SIZE_KEY,
OZONE_SCM_CHUNK_SIZE_DEFAULT,
StorageUnit.BYTES);
datastreamEnabled = ozoneConfiguration.getBoolean(
DFS_CONTAINER_RATIS_DATASTREAM_ENABLED,
DFS_CONTAINER_RATIS_DATASTREAM_ENABLED_DEFAULT);
datastreamMinLength = (long) ozoneConfiguration.getStorageSize(
OZONE_FS_DATASTREAM_AUTO_THRESHOLD,
OZONE_FS_DATASTREAM_AUTO_THRESHOLD_DEFAULT, StorageUnit.BYTES);
}

/**
Expand Down Expand Up @@ -203,6 +224,13 @@ public Response put(
ReplicationConfig replicationConfig =
getReplicationConfig(bucket, storageType);

boolean enableEC = false;
if ((replicationConfig != null &&
replicationConfig.getReplicationType() == EC) ||
bucket.getReplicationConfig() instanceof ECReplicationConfig) {
enableEC = true;
}

if (copyHeader != null) {
//Copy object, as copy source available.
s3GAction = S3GAction.COPY_OBJECT;
Expand Down Expand Up @@ -233,11 +261,19 @@ public Response put(
.equals(headers.getHeaderString("x-amz-content-sha256"))) {
body = new SignedChunksInputStream(body);
}
long putLength = 0;
if (datastreamEnabled && !enableEC && length > datastreamMinLength) {
getMetrics().updatePutKeyMetadataStats(startNanos);
putLength = ObjectEndpointStreaming
.put(bucket, keyPath, length, replicationConfig, chunkSize,
customMetadata, body);
} else {
output = getClientProtocol().createKey(volume.getName(), bucketName,
keyPath, length, replicationConfig, customMetadata);
getMetrics().updatePutKeyMetadataStats(startNanos);
putLength = IOUtils.copyLarge(body, output);
}

output = getClientProtocol().createKey(volume.getName(), bucketName,
keyPath, length, replicationConfig, customMetadata);
getMetrics().updatePutKeyMetadataStats(startNanos);
long putLength = IOUtils.copyLarge(body, output);
getMetrics().incPutKeySuccessLength(putLength);
return Response.ok().status(HttpStatus.SC_OK)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.s3.endpoint;

import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Map;

import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST;

/**
* Key level rest endpoints for Streaming.
*/
final class ObjectEndpointStreaming {

private static final Logger LOG =
LoggerFactory.getLogger(ObjectEndpointStreaming.class);

private ObjectEndpointStreaming() {
}

public static long put(OzoneBucket bucket, String keyPath,
long length, ReplicationConfig replicationConfig,
int chunkSize, Map<String, String> keyMetadata,
InputStream body)
throws IOException, OS3Exception {

try {
return putKeyWithStream(bucket, keyPath,
length, chunkSize, replicationConfig, keyMetadata, body);
} catch (IOException ex) {
LOG.error("Exception occurred in PutObject", ex);
if (ex instanceof OMException) {
if (((OMException) ex).getResult() ==
OMException.ResultCodes.NOT_A_FILE) {
OS3Exception os3Exception = S3ErrorTable.newError(INVALID_REQUEST,
keyPath);
os3Exception.setErrorMessage("An error occurred (InvalidRequest) " +
"when calling the PutObject/MPU PartUpload operation: " +
OZONE_OM_ENABLE_FILESYSTEM_PATHS + " is enabled Keys are" +
" considered as Unix Paths. Path has Violated FS Semantics " +
"which caused put operation to fail.");
throw os3Exception;
} else if ((((OMException) ex).getResult() ==
OMException.ResultCodes.PERMISSION_DENIED)) {
throw S3ErrorTable.newError(S3ErrorTable.ACCESS_DENIED, keyPath);
}
}
throw ex;
}
}

public static long putKeyWithStream(OzoneBucket bucket,
String keyPath,
long length,
int bufferSize,
ReplicationConfig replicationConfig,
Map<String, String> keyMetadata,
InputStream body)
throws IOException {
long writeLen = 0;
try (OzoneDataStreamOutput streamOutput = bucket.createStreamKey(keyPath,
length, replicationConfig, keyMetadata)) {
writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
}
return writeLen;
}

private static long writeToStreamOutput(OzoneDataStreamOutput streamOutput,
InputStream body, int bufferSize,
long length)
throws IOException {
final byte[] buffer = new byte[bufferSize];
long n = 0;
while (n < length) {
final int toRead = Math.toIntExact(Math.min(bufferSize, length - n));
final int readLength = body.read(buffer, 0, toRead);
if (readLength == -1) {
break;
}
streamOutput.write(ByteBuffer.wrap(buffer, 0, readLength));
n += readLength;
}
return n;
}
}