Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public interface StreamCapabilities {
String IOSTATISTICS_CONTEXT = "fs.capability.iocontext.supported";

/**
* Stream support multipart uploads to the given patch
* Stream supports multipart uploads to the given path.
*/
String MULTIPART_SUPPORTED = "fs.capability.multipart.supported";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,7 @@ private Constants() {

/**
* Option to enable or disable the multipart uploads.
* Value: {@value}.
* <p>
* Default is {@link #MULTIPART_UPLOAD_ENABLED_DEFAULT}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private ArnResource accessPoint;

/**
* Is this S3A FS instance has multipart uploads enabled?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grammar nit
"is multipart upload enabled?"

*/
private boolean isMultipartEnabled;

/**
* A cache of files that should be deleted when the FileSystem is closed
* or the JVM is exited.
Expand Down Expand Up @@ -533,6 +538,8 @@ public void initialize(URI name, Configuration originalConf)
this.prefetchBlockSize = (int) prefetchBlockSizeLong;
this.prefetchBlockCount =
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
this.isMultipartEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
MULTIPART_UPLOAD_ENABLED_DEFAULT);

initThreadPools(conf);

Expand Down Expand Up @@ -1080,6 +1087,7 @@ protected RequestFactory createRequestFactory() {
.withRequestPreparer(getAuditManager()::requestCreated)
.withContentEncoding(contentEncoding)
.withStorageClass(storageClass)
.withMultipartEnabled(isMultipartEnabled)
.build();
}

Expand Down Expand Up @@ -1831,8 +1839,8 @@ private FSDataOutputStream innerCreateFile(
new PutObjectOptions(keep, null, options.getHeaders());

if(!checkDiskBuffer(getConf())){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just add a method validateOutputStreamConfiguration() and throw exception in the implementation only.

Copy link
Contributor

@mukund-thakur mukund-thakur Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just add a method validateOutputStreamConfiguration() and throw exception in the implementation only.

This is still pending. I don't really mind leaving it as it is but I think my suggestion is consistent with other parts of the code and is more readable.
CC @steveloughran

throw new IOException("The filesystem conf is not " +
"proper for the output stream");
throw new IOException("Unable to create OutputStream with the given"
+ " multipart upload and buffer configuration.");
}

final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
Expand All @@ -1859,8 +1867,7 @@ private FSDataOutputStream innerCreateFile(
.withPutOptions(putOptions)
.withIOStatisticsAggregator(
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
.withMultipartEnabled(getConf().getBoolean(
MULTIPART_UPLOADS_ENABLED, MULTIPART_UPLOAD_ENABLED_DEFAULT));
.withMultipartEnabled(isMultipartEnabled);
return new FSDataOutputStream(
new S3ABlockOutputStream(builder),
null);
Expand Down Expand Up @@ -5418,4 +5425,8 @@ public RequestFactory getRequestFactory() {
public boolean isCSEEnabled() {
return isCSEEnabled;
}

public boolean isMultipartEnabled() {
return isMultipartEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1037,8 +1037,8 @@ public static long getMultipartSizeProperty(Configuration conf,
* be supported. When the option is disabled only disk buffers are allowed to
* be used as the file size might be bigger than the buffer size that can be
* allocated.
* @param conf
* @return
* @param conf : configuration object for the given context
* @return true if the disk buffer and the multipart settings are supported
*/
public static boolean checkDiskBuffer(Configuration conf) {
boolean isMultipartEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -199,7 +200,7 @@ AbortMultipartUploadRequest newAbortMultipartUploadRequest(
*/
InitiateMultipartUploadRequest newMultipartUploadRequest(
String destKey,
@Nullable PutObjectOptions options);
@Nullable PutObjectOptions options) throws IOException;

/**
* Complete a multipart upload.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ protected AbstractS3ACommitter(
LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
role, jobName(context), jobIdString(context), outputPath);
S3AFileSystem fs = getDestS3AFS();
if (!fs.isMultipartEnabled()) {
throw new PathCommitException(outputPath, "Multipart uploads are disabled for the FileSystem,"
+ " the committer can't proceed.");
}
// set this thread's context with the job ID.
// audit spans created in this thread will pick
// up this value., including the commit operations instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public class RequestFactoryImpl implements RequestFactory {
*/
private final StorageClass storageClass;

/**
* Is Multipart Enabled
*/
private final boolean isMultipartEnabled;

/**
* Constructor.
* @param builder builder with all the configuration.
Expand All @@ -137,6 +142,7 @@ protected RequestFactoryImpl(
this.requestPreparer = builder.requestPreparer;
this.contentEncoding = builder.contentEncoding;
this.storageClass = builder.storageClass;
this.isMultipartEnabled = builder.isMultipartEnabled;
}

/**
Expand Down Expand Up @@ -460,7 +466,10 @@ public AbortMultipartUploadRequest newAbortMultipartUploadRequest(
@Override
public InitiateMultipartUploadRequest newMultipartUploadRequest(
final String destKey,
@Nullable final PutObjectOptions options) {
@Nullable final PutObjectOptions options) throws IOException {
if(!isMultipartEnabled){
throw new IOException("Multipart uploads are disabled on the given filesystem.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make a PathIOException and include destkey. This gives a bit more detail.

throw new PathIOException(destKey, "Multipart uploads are disabled");

}
final ObjectMetadata objectMetadata = newObjectMetadata(-1);
maybeSetMetadata(options, objectMetadata);
final InitiateMultipartUploadRequest initiateMPURequest =
Expand Down Expand Up @@ -682,6 +691,11 @@ public static final class RequestFactoryBuilder {
*/
private PrepareRequest requestPreparer;

/**
* Is Multipart Enabled on the path.
*/
private boolean isMultipartEnabled = true;

private RequestFactoryBuilder() {
}

Expand Down Expand Up @@ -767,6 +781,18 @@ public RequestFactoryBuilder withRequestPreparer(
this.requestPreparer = value;
return this;
}

/**
* Multipart enabled
*
* @param value new value
* @return the builder
*/
public RequestFactoryBuilder withMultipartEnabled(
final boolean value) {
this.isMultipartEnabled = value;
return this;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.commit.magic;

import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;

import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;

public class ITestMagicCommitProtocolFailure extends AbstractS3ATestBase {

@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY);
conf.set(FS_S3A_COMMITTER_NAME, CommitConstants.COMMITTER_NAME_MAGIC);
return conf;
}

@Test
public void testCreateCommitter() {
TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(),
new TaskAttemptID());
Path commitPath = getFileSystem().makeQualified(
new Path(getContract().getTestPath(), "/testpath"));
LOG.debug("{}", commitPath);
assertThrows(PathCommitException.class,
() -> new MagicS3GuardCommitter(commitPath, tContext));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.commit.staging.integration;

import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;

import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;

public class ITestStagingCommitProtocolFailure extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY);
conf.set(FS_S3A_COMMITTER_NAME, InternalCommitterConstants.COMMITTER_NAME_STAGING);
return conf;
}

@Test
public void testCreateCommitter() {
TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(),
new TaskAttemptID());
Path commitPath = getFileSystem().makeQualified(
new Path(getContract().getTestPath(), "/testpath"));
LOG.debug("{}", commitPath);
assertThrows(PathCommitException.class,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same intercept.

() -> new StagingCommitter(commitPath, tContext));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -173,7 +174,11 @@ private void createFactoryObjects(RequestFactory factory) {
a(factory.newListObjectsV1Request(path, "/", 1));
a(factory.newListNextBatchOfObjectsRequest(new ObjectListing()));
a(factory.newListObjectsV2Request(path, "/", 1));
a(factory.newMultipartUploadRequest(path, null));
try {
a(factory.newMultipartUploadRequest(path, null));
} catch (IOException e) {
throw new RuntimeException(e);
}
File srcfile = new File("/tmp/a");
a(factory.newPutObjectRequest(path,
factory.newObjectMetadata(-1), null, srcfile));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;

/**
* Test a file upload using a single PUT operation. Multipart uploads will
Expand Down Expand Up @@ -67,7 +68,7 @@ public void uploadFileSinglePut() throws IOException {
//First one being the creation of test/ directory marker
//Second being the creation of the file with tests3ascale/<file-name>
//Third being the creation of directory marker tests3ascale/ on the file delete
assertEquals(3L,
(long) fs.getIOStatistics().counters().get(OBJECT_PUT_REQUESTS.getSymbol()));
assertThatStatisticCounter(fs.getIOStatistics(), OBJECT_PUT_REQUESTS.getSymbol())
.isEqualTo(3);
}
}