Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1873,11 +1873,9 @@

<property>
<name>fs.s3a.committer.magic.enabled</name>
<value>false</value>
<value>true</value>
<description>
Enable support in the filesystem for the S3 "Magic" committer.
When working with AWS S3, S3Guard must be enabled for the destination
bucket, as consistent metadata listings are required.
Enable support in the S3A filesystem for the "Magic" committer.
</description>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ private CommitConstants() {
= "s3a:magic.committer";

/**
* Is the committer enabled by default? No.
* Is the committer enabled by default: {@value}.
*/
public static final boolean DEFAULT_MAGIC_COMMITTER_ENABLED = false;
public static final boolean DEFAULT_MAGIC_COMMITTER_ENABLED = true;

/**
* This is the "Pending" directory of the {@code FileOutputCommitter};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,6 @@ public void createSuccessMarker(Path outputPath,
conf.getTrimmed(METADATASTORE_AUTHORITATIVE, "false"));
successData.addDiagnostic(AUTHORITATIVE_PATH,
conf.getTrimmed(AUTHORITATIVE_PATH, ""));
successData.addDiagnostic(MAGIC_COMMITTER_ENABLED,
conf.getTrimmed(MAGIC_COMMITTER_ENABLED, "false"));

// now write
Path markerPath = new Path(outputPath, _SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,27 @@ Conflict management is left to the execution engine itself.

```

### Disabling magic committer path rewriting

The magic committer recognizes when files are created under paths with `__magic/` as a parent directory
and redirects the upload to a different location, adding the information needed to complete the upload
in the job commit operation.

If, for some reason, you *do not* want these paths to be redirected and not manifest until later,
the feature can be disabled by setting `fs.s3a.committer.magic.enabled` to false.

By default it is true.

```xml
<property>
<name>fs.s3a.committer.magic.enabled</name>
<value>true</value>
<description>
Enable support in the S3A filesystem for the "Magic" committer.
</description>
</property>
```

## <a name="concurrent-jobs"></a> Concurrent Jobs writing to the same destination

It is sometimes possible for multiple jobs to simultaneously write to the same destination path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;

Expand Down Expand Up @@ -71,8 +70,6 @@ public Configuration createConfiguration() {
// test we don't issue request to AWS DynamoDB service.
conf.setClass(S3_METADATA_STORE_IMPL, NullMetadataStore.class,
MetadataStore.class);
// FS is always magic
conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true);
// use minimum multipart size for faster triggering
conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE);
conf.setInt(Constants.S3A_BUCKET_PROBE, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
import static org.junit.Assert.*;

/**
Expand Down Expand Up @@ -628,9 +627,6 @@ public static Configuration prepareTestConfiguration(final Configuration conf) {
conf.set(HADOOP_TMP_DIR, tmpDir);
}
conf.set(BUFFER_DIR, tmpDir);
// add this so that even on tests where the FS is shared,
// the FS is always "magic"
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);

// directory marker policy
String directoryRetention = getTestProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,6 @@ public void testAssumedRoleRetryHandler() throws Throwable {
public void testRestrictedCommitActions() throws Throwable {
describe("Attempt commit operations against a path with restricted rights");
Configuration conf = createAssumedRoleConfig();
conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true);
final int uploadPartSize = 5 * 1024 * 1024;

ProgressCounter progress = new ProgressCounter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ protected Configuration createConfiguration() {
FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
FAST_UPLOAD_BUFFER);

conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
conf.setBoolean(MAGIC_COMMITTER_ENABLED, DEFAULT_MAGIC_COMMITTER_ENABLED);
conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_ARRAY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,6 @@ public boolean useInconsistentClient() {
return false;
}

@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
return conf;
}

@Override
protected String getCommitterFactoryName() {
return CommitConstants.S3A_COMMITTER_FACTORY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -83,17 +82,6 @@ public String getTestSuiteName() {
return "ITestS3AHugeMagicCommits";
}

/**
* Create the scale IO conf with the committer enabled.
* @return the configuration to use for the test FS.
*/
@Override
protected Configuration createScaleConfiguration() {
Configuration conf = super.createScaleConfiguration();
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
return conf;
}

@Override
public void setup() throws Exception {
super.setup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@

import static java.util.Optional.empty;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;

/**
* Runs Terasort against S3A.
Expand Down Expand Up @@ -155,7 +154,6 @@ public void setup() throws Exception {
@Override
protected void applyCustomConfigOptions(JobConf conf) {
// small sample size for faster runs
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
conf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(),
getSampleSizeForEachPartition());
conf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.StringUtils;
Expand All @@ -61,7 +60,6 @@
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
import static org.apache.hadoop.fs.s3a.S3AUtils.clearBucketOption;
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.BucketInfo.IS_MARKER_AWARE;
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.E_BAD_STATE;
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.INVALID_ARGUMENT;
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardToolTestHelper.exec;
Expand Down Expand Up @@ -590,16 +588,8 @@ public void testProbeForMagic() throws Throwable {
String name = fs.getUri().toString();
S3GuardTool.BucketInfo cmd = new S3GuardTool.BucketInfo(
getConfiguration());
if (fs.hasPathCapability(fs.getWorkingDirectory(),
CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER)) {
// if the FS is magic, expect this to work
// this must always work
exec(cmd, S3GuardTool.BucketInfo.MAGIC_FLAG, name);
} else {
// if the FS isn't magic, expect the probe to fail
assertExitCode(E_BAD_STATE,
intercept(ExitUtil.ExitException.class,
() -> exec(cmd, S3GuardTool.BucketInfo.MAGIC_FLAG, name)));
}
}

/**
Expand Down