Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ can be queried to find if the path has an ACL. `getFileStatus(Path p).isEncrypte
can be queried to find if the path is encrypted. `getFileStatus(Path p).isErasureCoded()`
will tell if the path is erasure coded or not.

YARN's distributed cache lets applications add paths to be cached across
containers and applications via `Job.addCacheFile()` and `Job.addCacheArchive()`.
The cache treats world-readable resources paths added as shareable across
applications, and downloads them differently, unless they are declared as encrypted.

To avoid failures during container launching, especially when delegation tokens
are used, filesystems and object stores which not implement POSIX access permissions
for both files and directories, MUST always return `true` to the `isEncrypted()`
predicate. This can be done by setting the `encrypted` flag to true when creating
the `FileStatus` instance.

### `Path getHomeDirectory()`

The function `getHomeDirectory` returns the home directory for the FileSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,20 @@ public void testFsIsEncrypted() throws Exception {
final Path path = path("file");
createFile(getFileSystem(), path, false, new byte[0]);
final FileStatus stat = getFileSystem().getFileStatus(path);
assertFalse("Expecting false for stat.isEncrypted()",
assertEquals("Result wrong for for isEncrypted() in " + stat,
areZeroByteFilesEncrypted(),
stat.isEncrypted());
}

/**
* Are zero byte files encrypted. This is implicitly
* false for filesystems which do not encrypt.
* @return true iff zero byte files are encrypted.
*/
protected boolean areZeroByteFilesEncrypted() {
return false;
}

@Test
public void testOpenReadDir() throws Throwable {
describe("create & read a directory");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public S3AFileStatus(boolean isemptydir,
public S3AFileStatus(Tristate isemptydir,
Path path,
String owner) {
super(0, true, 1, 0, 0, path);
super(0, true, 1, 0, 0, 0,
null, null, null, null,
path, false, true, false);
isEmptyDirectory = isemptydir;
setOwner(owner);
setGroup(owner);
Expand All @@ -70,7 +72,9 @@ public S3AFileStatus(Tristate isemptydir,
*/
public S3AFileStatus(long length, long modification_time, Path path,
long blockSize, String owner) {
super(length, false, 1, blockSize, modification_time, path);
super(length, false, 1, blockSize, modification_time, 0,
null, null, null, null,
path, false, true, false);
isEmptyDirectory = Tristate.FALSE;
setOwner(owner);
setGroup(owner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,13 @@ protected Configuration createConfiguration() {
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}

/**
* S3A always declares zero byte files as encrypted.
* @return true, always.
*/
@Override
protected boolean areZeroByteFilesEncrypted() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ public static AbstractS3ATokenIdentifier lookupToken(
assertEquals("Kind of token " + token,
kind,
token.getKind());
return token.decodeIdentifier();
AbstractS3ATokenIdentifier tid
= token.decodeIdentifier();
LOG.info("Found for URI {}, token {}", uri, tid);
return tid;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.hadoop.fs.s3a.auth.delegation;

import java.net.URI;
import java.util.Arrays;
import java.util.Collection;

Expand All @@ -29,6 +30,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.WordCount;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
Expand All @@ -47,6 +49,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_METASTORE_NULL;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeSessionTestsEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
Expand All @@ -72,7 +75,12 @@
* of org.apache.hadoop.mapreduce.protocol.ClientProtocol is mock.
*
* It's still an ITest though, as it does use S3A as the source and
* dest so as to collect URLs.
* dest so as to collect delegation tokens.
*
* It also uses the open street map open bucket, so that there's an extra
* S3 URL in job submission which can be added as a job resource.
* This is needed to verify that job resources have their tokens extracted
* too.
*/
@RunWith(Parameterized.class)
public class ITestDelegatedMRJob extends AbstractDelegationIT {
Expand All @@ -99,6 +107,11 @@ public class ITestDelegatedMRJob extends AbstractDelegationIT {

private Path destPath;

private static final Path EXTRA_JOB_RESOURCE_PATH
= new Path("s3a://osm-pds/planet/planet-latest.orc");

public static final URI jobResource = EXTRA_JOB_RESOURCE_PATH.toUri();

/**
* Test array for parameterized test runs.
* @return a list of parameter tuples.
Expand Down Expand Up @@ -149,6 +162,15 @@ protected YarnConfiguration createConfiguration() {
conf.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
10_000);

// turn off DDB for the job resource bucket
String host = jobResource.getHost();
conf.set(
String.format("fs.s3a.bucket.%s.metadatastore.impl", host),
S3GUARD_METASTORE_NULL);
// and fix to the main endpoint if the caller has moved
conf.set(
String.format("fs.s3a.bucket.%s.endpoint", host), "");

// set up DTs
enableDelegationTokens(conf, tokenBinding);
return conf;
Expand Down Expand Up @@ -210,6 +232,15 @@ protected int getTestTimeoutMillis() {
return getTestTimeoutSeconds() * 1000;
}

@Test
public void testCommonCrawlLookup() throws Throwable {
FileSystem resourceFS = EXTRA_JOB_RESOURCE_PATH.getFileSystem(
getConfiguration());
FileStatus status = resourceFS.getFileStatus(EXTRA_JOB_RESOURCE_PATH);
LOG.info("Extra job resource is {}", status);
assertTrue("Not encrypted: " + status, status.isEncrypted());
}

@Test
public void testJobSubmissionCollectsTokens() throws Exception {
describe("Mock Job test");
Expand Down Expand Up @@ -242,6 +273,14 @@ public void testJobSubmissionCollectsTokens() throws Exception {
job.setMaxMapAttempts(1);
job.setMaxReduceAttempts(1);

// and a file for a different store.
// This is to actually stress the terasort code for which
// the yarn ResourceLocalizationService was having problems with
// fetching resources from.
URI partitionUri = new URI(EXTRA_JOB_RESOURCE_PATH.toString() +
"#_partition.lst");
job.addCacheFile(partitionUri);

describe("Executing Mock Job Submission to %s", output);

job.submit();
Expand All @@ -267,6 +306,8 @@ public void testJobSubmissionCollectsTokens() throws Exception {
lookupToken(submittedCredentials, sourceFS.getUri(), tokenKind);
// look up the destination token
lookupToken(submittedCredentials, fs.getUri(), tokenKind);
lookupToken(submittedCredentials,
EXTRA_JOB_RESOURCE_PATH.getFileSystem(conf).getUri(), tokenKind);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.mapreduce.filecache;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.junit.Test;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.test.HadoopTestBase;

/**
* Test how S3A resources are scoped in YARN caching.
* In this package to make use of package-private methods of
* {@link ClientDistributedCacheManager}.
*/
public class TestS3AResourceScope extends HadoopTestBase {

private static final Path PATH = new Path("s3a://example/path");

@Test
public void testS3AFilesArePrivate() throws Throwable {
S3AFileStatus status = new S3AFileStatus(false, PATH, "self");
assertTrue("Not encrypted: " + status, status.isEncrypted());
assertNotExecutable(status);
}

@Test
public void testS3AFilesArePrivateOtherContstructor() throws Throwable {
S3AFileStatus status = new S3AFileStatus(0, 0, PATH, 1, "self");
assertTrue("Not encrypted: " + status, status.isEncrypted());
assertNotExecutable(status);
}

private void assertNotExecutable(final S3AFileStatus status)
throws IOException {
Map<URI, FileStatus> cache = new HashMap<>();
cache.put(PATH.toUri(), status);
assertFalse("Should not have been executable " + status,
ClientDistributedCacheManager.ancestorsHaveExecutePermissions(
null, PATH, cache));
}
}