Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions plugins/repository-hdfs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ for (int hadoopVersion = minTestedHadoopVersion; hadoopVersion <= maxTestedHadoo
} else {
miniHDFSArgs.add("-Dhdfs.config.port=" + getNonSecureNamenodePortForVersion(hadoopVer))
}
// This is required for webhdfs to work, otherwise miniHDFS will fail with module java.base does not "opens java.lang" to unnamed module
miniHDFSArgs.add('--add-opens')
miniHDFSArgs.add('java.base/java.lang=ALL-UNNAMED')
miniHDFSArgs.add('-Dhdfs.config.http.port=' + getHttpPortForVersion(hadoopVer))
// If it's an HA fixture, set a nameservice to use in the JVM options
if (name.startsWith('haHdfs') || name.startsWith('secureHaHdfs')) {
miniHDFSArgs.add("-Dha-nameservice=ha-hdfs")
Expand Down Expand Up @@ -189,7 +193,7 @@ for (int hadoopVersion = minTestedHadoopVersion; hadoopVersion <= maxTestedHadoo
}
}

for (String integTestTaskName : ['yamlRestTest' + hadoopVersion, 'yamlRestTestSecure' + hadoopVersion]) {
for (String integTestTaskName : ['yamlRestTest' + hadoopVersion, 'yamlRestTestSecure' + hadoopVersion, 'yamlRestTestWeb' + hadoopVersion]) {
tasks.register(integTestTaskName, RestIntegTestTask) {
description = "Runs rest tests against an elasticsearch cluster with HDFS" + hadoopVer

Expand All @@ -207,6 +211,7 @@ for (int hadoopVersion = minTestedHadoopVersion; hadoopVersion <= maxTestedHadoo
Map<String, Object> expansions = [
'hdfs_port' : getNonSecureNamenodePortForVersion(hadoopVer),
'secure_hdfs_port': getSecureNamenodePortForVersion(hadoopVer),
'hdfs_http_port' : getHttpPortForVersion(hadoopVer),
]
inputs.properties(expansions)
filter("tokens": expansions.collectEntries { k, v -> [k, v.toString()]}, ReplaceTokens.class)
Expand All @@ -217,6 +222,9 @@ for (int hadoopVersion = minTestedHadoopVersion; hadoopVersion <= maxTestedHadoo
it.into("secure_hdfs_repository_" + hadoopVer) {
from "src/yamlRestTest/resources/rest-api-spec/test/secure_hdfs_repository"
}
it.into("webhdfs_repository_" + hadoopVer) {
from "src/yamlRestTest/resources/rest-api-spec/test/webhdfs_repository"
}
}
tasks.named("processYamlRestTestResources").configure {
dependsOn(processHadoopTestResources)
Expand All @@ -234,6 +242,11 @@ for (int hadoopVersion = minTestedHadoopVersion; hadoopVersion <= maxTestedHadoo
// The normal test runner only runs the standard hdfs rest tests
systemProperty 'tests.rest.suite', 'hdfs_repository_' + hadoopVer
}
tasks.named("yamlRestTestWeb" + hadoopVer).configure {
dependsOn "hdfs" + hadoopVer + "Fixture"
// The normal test runner only runs the standard hdfs rest tests
systemProperty 'tests.rest.suite', 'hdfs_repository_' + hadoopVer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be the webhdfs rest tests instead of the hdfs ones?

}
tasks.named("javaRestTest" + hadoopVer).configure {
dependsOn "haHdfs" + hadoopVer + "Fixture"
}
Expand All @@ -243,19 +256,27 @@ for (int hadoopVersion = minTestedHadoopVersion; hadoopVersion <= maxTestedHadoo
systemProperty 'tests.rest.suite', 'hdfs_repository_' + hadoopVer + '/10_basic'
}
// HA fixture is unsupported. Don't run them.
tasks.named("yamlRestTestSecure" + hadoopVer).configure {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small Nit: Can we move this down a little to rectify the code comment above it? The HA tests are done by the javaRestTest* tasks, and while we may justifiably want to disable all the yamlRestTest* tasks here the comment placement is inaccurate.

This build file is really complicated. I normally wouldn't bring this kind of thing up otherwise

enabled = false
}
tasks.named("javaRestTestSecure" + hadoopVer).configure {
enabled = false
}
}

tasks.named("check").configure {
dependsOn("yamlRestTest" + hadoopVer, "yamlRestTestSecure" + hadoopVer, "javaRestTestSecure" + hadoopVer)
dependsOn("yamlRestTest" + hadoopVer, "yamlRestTestSecure" + hadoopVer, "javaRestTestSecure" + hadoopVer, "yamlRestTestWeb" + hadoopVer)
}

// Run just the secure hdfs rest test suite.
tasks.named("yamlRestTestSecure" + hadoopVer).configure {
systemProperty 'tests.rest.suite', 'secure_hdfs_repository_' + hadoopVer
}

// Run just the secure webhdfs rest test suite.
tasks.named("yamlRestTestWeb" + hadoopVer).configure {
systemProperty 'tests.rest.suite', 'webhdfs_repository_' + hadoopVer
}
}


Expand All @@ -267,6 +288,10 @@ def getNonSecureNamenodePortForVersion(hadoopVersion) {
return 10003 - (2 * hadoopVersion)
}

def getHttpPortForVersion(hadoopVersion) {
return 10004 - (2 * hadoopVersion)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried running the tests locally and the fixture does not start due to an already used port. I think these port assignment functions need to be updated to use an offset of 3 instead of 2 now.

}

Set disabledIntegTestTaskNames = []

tasks.withType(RestIntegTestTask).configureEach { testTask ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ final class HdfsBlobStore implements BlobStore {
this.fileContext = fileContext;
// Only restrict permissions if not running with HA
boolean restrictPermissions = (haEnabled == false);
this.securityContext = new HdfsSecurityContext(fileContext.getUgi(), restrictPermissions);
this.securityContext = new HdfsSecurityContext(
fileContext.getUgi(),
restrictPermissions,
fileContext.getDefaultFileSystem().getUri().getScheme()
);
this.bufferSize = bufferSize;
this.root = execute(fileContext1 -> fileContext1.makeQualified(new Path(path)));
this.readOnly = readOnly;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.net.UnknownHostException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.List;
import java.util.Locale;

public final class HdfsRepository extends BlobStoreRepository {
Expand All @@ -47,6 +48,8 @@ public final class HdfsRepository extends BlobStoreRepository {

private static final String CONF_SECURITY_PRINCIPAL = "security.principal";

private static final List<String> allowedSchemes = List.of("hdfs", "webhdfs", "swebhdfs");

private final Environment environment;
private final ByteSizeValue chunkSize;
private final URI uri;
Expand All @@ -70,17 +73,18 @@ public HdfsRepository(
throw new IllegalArgumentException("No 'uri' defined for hdfs snapshot/restore");
}
uri = URI.create(uriSetting);
if ("hdfs".equalsIgnoreCase(uri.getScheme()) == false) {
if (uri.getScheme() == null || allowedSchemes.contains(uri.getScheme()) == false) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Invalid scheme [%s] specified in uri [%s]; only 'hdfs' uri allowed for hdfs snapshot/restore",
"Invalid scheme [%s] specified in uri [%s]; only one of %s uris allowed for hdfs snapshot/restore",
uri.getScheme(),
uriSetting
uriSetting,
String.join(", ", allowedSchemes)
)
);
}
if (Strings.hasLength(uri.getPath()) && uri.getPath().equals("/") == false) {
if (uri.getScheme().equals("hdfs") && Strings.hasLength(uri.getPath()) && uri.getPath().equals("/") == false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this test is specific to hdfs? If the uri is specified as webhdfs://hostname:9870/path/to/repo I think we would want to intercept and advise on that too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, that's a bit of wishful thinking on my part. You see, I have separate PR for Hadoop: apache/hadoop#5447 that would utilize base path in webhdfs-scheme URIs. I was actually testing this PR combined with my Hadoop PR at one point.

For some context: webhdfs is using regular HTTP(S) requests for interacting with filesystem. The requests are being made to base scheme URI + a hardcoded prefix, /webhdfs/v1. However, there are some webhdfs proxies, most notably Apache Knox, that have a different API path. It's also easy to set up a custom proxy, since it's just HTTP.

For Apache Knox, the default path is /gateway/<gateway-name>/webhdfs/v1. Somewhat ironically, Apache Hadoop's client does not support this custom path currently, unlike most third party clients. In that PR apache/hadoop#5447 I've linked to before I proposed to add support for it by utilizing the path in connection URI. So, while it's not the same as filesystem base path, there would be a use-case where user would want to provide a separate path in connection URI, for example webhdfs://hostname:9870/gateway/myname/webhdfs/v1/. It would be only path to API, different from a path option.

Anyway, I can make this validation apply to (s)webhdfs schemes for now if you'd prefer, please let me know. We could then perhaps revisit if Hadoop would accept the change.

throw new IllegalArgumentException(
String.format(
Locale.ROOT,
Expand Down Expand Up @@ -111,6 +115,8 @@ private HdfsBlobStore createBlobstore(URI blobstoreUri, String path, Settings re

// Disable FS cache
hadoopConfiguration.setBoolean("fs.hdfs.impl.disable.cache", true);
hadoopConfiguration.setBoolean("fs.webhdfs.impl.disable.cache", true);
hadoopConfiguration.setBoolean("fs.swebhdfs.impl.disable.cache", true);

// Create a hadoop user
UserGroupInformation ugi = login(hadoopConfiguration, repositorySettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.hadoop.security.UserGroupInformation;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.env.Environment;

import java.io.IOException;
Expand All @@ -22,6 +23,7 @@
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Objects;

import javax.security.auth.AuthPermission;
import javax.security.auth.PrivateCredentialPermission;
Expand All @@ -37,6 +39,8 @@ class HdfsSecurityContext {

private static final Permission[] SIMPLE_AUTH_PERMISSIONS;
private static final Permission[] KERBEROS_AUTH_PERMISSIONS;
private static final Permission[] WEBHDFS_PERMISSIONS;
private static final Permission[] SWEBHDFS_PERMISSIONS;
static {
// We can do FS ops with only a few elevated permissions:
SIMPLE_AUTH_PERMISSIONS = new Permission[] {
Expand Down Expand Up @@ -72,6 +76,16 @@ class HdfsSecurityContext {
// 7) allow code to initiate kerberos connections as the logged in user
// Still far and away fewer permissions than the original full plugin policy
};
WEBHDFS_PERMISSIONS = new Permission[] {
// 1) allow hadoop to act as the logged in Subject
new AuthPermission("doAs")
};
SWEBHDFS_PERMISSIONS = new Permission[] {
// 1) allow hadoop to act as the logged in Subject
new AuthPermission("doAs"),
// 2) allow hadoop to call setSSLSocketFactory on HttpsURLConnection
new RuntimePermission("setFactory"),
};
}

/**
Expand All @@ -94,13 +108,13 @@ static Path locateKeytabFile(Environment environment) {
private final boolean restrictPermissions;
private final Permission[] restrictedExecutionPermissions;

HdfsSecurityContext(UserGroupInformation ugi, boolean restrictPermissions) {
HdfsSecurityContext(UserGroupInformation ugi, boolean restrictPermissions, String scheme) {
this.ugi = ugi;
this.restrictPermissions = restrictPermissions;
this.restrictedExecutionPermissions = renderPermissions(ugi);
this.restrictedExecutionPermissions = renderPermissions(ugi, scheme);
}

private Permission[] renderPermissions(UserGroupInformation userGroupInformation) {
private Permission[] renderPermissions(UserGroupInformation userGroupInformation, String scheme) {
Permission[] permissions;
if (userGroupInformation.isFromKeytab()) {
// KERBEROS
Expand All @@ -117,6 +131,12 @@ private Permission[] renderPermissions(UserGroupInformation userGroupInformation
// SIMPLE
permissions = Arrays.copyOf(SIMPLE_AUTH_PERMISSIONS, SIMPLE_AUTH_PERMISSIONS.length);
}
if (Objects.equals(scheme, "webhdfs")) {
permissions = ArrayUtils.concat(permissions, WEBHDFS_PERMISSIONS, Permission.class);
} else if (Objects.equals(scheme, "swebhdfs")) {
permissions = ArrayUtils.concat(permissions, SWEBHDFS_PERMISSIONS, Permission.class);
}

return permissions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,8 @@ grant {
// client binds to the address returned from the host name of any principal set up as a service principal
// org.apache.hadoop.ipc.Client.Connection.setupConnection
permission java.net.SocketPermission "localhost:0", "listen,resolve";

// org.apache.hadoop.hdfs.web.SSLConnectionConfigurator
// This is used by swebhdfs connections
permission java.lang.RuntimePermission "setFactory";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here's the deal with this permission: Frustratingly, Java uses this permission to guard against setting a SSL socket factory on both a connection instance as well as the global default for all https connections. In fact, it's used to guard against setting all sorts of weird global state in the running JVM.

I'll ask around with our more Security minded folks and either link them here for their blessing on this, or collect feedback and share it here. I imagine that using webhdfs without SSL support is not exactly an attractive prospect. There is some prior art in the code base that might support this usage, but I need to double check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While not ideal, the grant here seems reasonable (given the usage).

I'm less concerned with the grant of setFactory permission, than I would be with other already granted permissions. And we already "trust" several other plugins and their dependencies with this permission.

};
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,36 @@ public void testPathSpecifiedInHdfs() {
}
}

public void testWebhdfsIsAllowedScheme() {
client().admin()
.cluster()
.preparePutRepository("test-repo")
.setType("hdfs")
.setSettings(
Settings.builder()
.put("uri", "webhdfs:///")
.put("conf.fs.AbstractFileSystem.webhdfs.impl", TestingFs.class.getName())
.put("path", "foo")
.put("chunk_size", randomIntBetween(100, 1000) + "k")
.put("compress", randomBoolean())
)
.get();
}

public void testSwebhdfsIsAllowedScheme() {
client().admin()
.cluster()
.preparePutRepository("test-repo")
.setType("hdfs")
.setSettings(
Settings.builder()
.put("uri", "swebhdfs:///")
.put("conf.fs.AbstractFileSystem.swebhdfs.impl", TestingFs.class.getName())
.put("path", "foo")
)
.get();
}

public void testMissingPath() {
try {
client().admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
---
#
# Check that we can't use file:// repositories or anything like that
# We only test this plugin against hdfs://
# We only test this plugin against hdfs:// and webhdfs://
#
"HDFS only":
- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
---
#
# Check that we can't use file:// repositories or anything like that
# We only test this plugin against hdfs://
# We only test this plugin against hdfs:// and webhdfs://
#
"HDFS only":
- do:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Integration tests for HDFS Repository plugin
#
# Check plugin is installed
#
"Plugin loaded":
- skip:
reason: "contains is a newly added assertion"
features: contains
- do:
cluster.state: {}

# Get master node id
- set: { master_node: master }

- do:
nodes.info: {}

- contains: { nodes.$master.plugins: { name: repository-hdfs } }
---
#
# Check that we can't use file:// repositories or anything like that
# We only test this plugin against hdfs:// and webhdfs://
#
"HDFS only":
- do:
catch: /Invalid scheme/
snapshot.create_repository:
repository: misconfigured_repository
body:
type: hdfs
settings:
uri: "file://bogus"
path: "foo/bar"
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Integration tests for HDFS Repository plugin
#
# Tests creating a repository
#
"HDFS Repository Creation":
# Create repository
- do:
snapshot.create_repository:
repository: test_repository_create
body:
type: hdfs
settings:
uri: "webhdfs://localhost:@hdfs_http_port@"
path: "test/repository_create"

# Get repository
- do:
snapshot.get_repository:
repository: test_repository_create

- is_true: test_repository_create
- match: {test_repository_create.settings.path : "test/repository_create"}

# Remove our repository
- do:
snapshot.delete_repository:
repository: test_repository_create
Loading