Skip to content

Commit dff4015

Browse files
iveselovskiyvozerov-gridgain
authored andcommitted
IGNITE-2195: Implemented Hadoop FileSystem factory capable of working with kerberized file systems. This closes apache#464.
1 parent 26bd1df commit dff4015

File tree

5 files changed

+339
-6
lines changed

5 files changed

+339
-6
lines changed

modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public BasicHadoopFileSystemFactory() {
6666

6767
/** {@inheritDoc} */
6868
@Override public FileSystem get(String usrName) throws IOException {
69-
return create0(IgfsUtils.fixUserName(usrName));
69+
return get0(IgfsUtils.fixUserName(usrName));
7070
}
7171

7272
/**
@@ -76,7 +76,7 @@ public BasicHadoopFileSystemFactory() {
7676
* @return File system.
7777
* @throws IOException If failed.
7878
*/
79-
protected FileSystem create0(String usrName) throws IOException {
79+
protected FileSystem get0(String usrName) throws IOException {
8080
assert cfg != null;
8181

8282
try {
@@ -87,12 +87,12 @@ protected FileSystem create0(String usrName) throws IOException {
8787
ClassLoader clsLdr = getClass().getClassLoader();
8888

8989
if (ctxClsLdr == clsLdr)
90-
return FileSystem.get(fullUri, cfg, usrName);
90+
return create(usrName);
9191
else {
9292
Thread.currentThread().setContextClassLoader(clsLdr);
9393

9494
try {
95-
return FileSystem.get(fullUri, cfg, usrName);
95+
return create(usrName);
9696
}
9797
finally {
9898
Thread.currentThread().setContextClassLoader(ctxClsLdr);
@@ -106,6 +106,18 @@ protected FileSystem create0(String usrName) throws IOException {
106106
}
107107
}
108108

109+
/**
110+
* Internal file system creation routine, invoked in correct class loader context.
111+
*
112+
* @param usrName User name.
113+
* @return File system.
114+
* @throws IOException If failed.
115+
* @throws InterruptedException if the current thread is interrupted.
116+
*/
117+
protected FileSystem create(String usrName) throws IOException, InterruptedException {
118+
return FileSystem.get(fullUri, cfg, usrName);
119+
}
120+
109121
/**
110122
* Gets file system URI.
111123
* <p>
@@ -152,7 +164,7 @@ public void setUri(@Nullable String uri) {
152164
*
153165
* @param cfgPaths Paths to file system configuration files.
154166
*/
155-
public void setConfigPaths(String... cfgPaths) {
167+
public void setConfigPaths(@Nullable String... cfgPaths) {
156168
this.cfgPaths = cfgPaths;
157169
}
158170

modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class CachingHadoopFileSystemFactory extends BasicHadoopFileSystemFactory
4747
private final transient HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>(
4848
new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
4949
@Override public FileSystem createValue(String key) throws IOException {
50-
return create0(key);
50+
return get0(key);
5151
}
5252
}
5353
);
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.hadoop.fs;
19+
20+
import org.apache.hadoop.fs.FileSystem;
21+
import org.apache.hadoop.security.UserGroupInformation;
22+
import org.apache.ignite.IgniteException;
23+
import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
24+
import org.apache.ignite.internal.util.typedef.F;
25+
import org.apache.ignite.internal.util.typedef.internal.A;
26+
import org.apache.ignite.internal.util.typedef.internal.U;
27+
import org.jetbrains.annotations.Nullable;
28+
29+
import java.io.IOException;
30+
import java.io.ObjectInput;
31+
import java.io.ObjectOutput;
32+
import java.security.PrivilegedExceptionAction;
33+
34+
/**
35+
* Secure Hadoop file system factory that can work with underlying file system protected with Kerberos.
36+
* It uses "impersonation" mechanism, to be able to work on behalf of arbitrary client user.
37+
* Please see https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html for details.
38+
* The principal and the key tab name to be used for Kerberos authentication are set explicitly
39+
* in the factory configuration.
40+
*
41+
* <p>This factory does not cache any file system instances. If {@code "fs.[prefix].impl.disable.cache"} is set
42+
* to {@code true}, file system instances will be cached by Hadoop.
43+
*/
44+
public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactory {
45+
/** */
46+
private static final long serialVersionUID = 0L;
47+
48+
/** The default interval used to re-login from the key tab, in milliseconds. */
49+
public static final long DFLT_RELOGIN_INTERVAL = 10 * 60 * 1000L;
50+
51+
/** Keytab full file name. */
52+
private String keyTab;
53+
54+
/** Keytab principal. */
55+
private String keyTabPrincipal;
56+
57+
/** The re-login interval. See {@link #getReloginInterval()} for more information. */
58+
private long reloginInterval = DFLT_RELOGIN_INTERVAL;
59+
60+
/** Time of last re-login attempt, in system milliseconds. */
61+
private transient volatile long lastReloginTime;
62+
63+
/**
64+
* Constructor.
65+
*/
66+
public KerberosHadoopFileSystemFactory() {
67+
// No-op.
68+
}
69+
70+
/** {@inheritDoc} */
71+
@Override public FileSystem get(String userName) throws IOException {
72+
reloginIfNeeded();
73+
74+
return super.get(userName);
75+
}
76+
77+
/** {@inheritDoc} */
78+
@Override protected FileSystem create(String usrName) throws IOException, InterruptedException {
79+
UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName,
80+
UserGroupInformation.getLoginUser());
81+
82+
return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() {
83+
@Override public FileSystem run() throws Exception {
84+
return FileSystem.get(fullUri, cfg);
85+
}
86+
});
87+
}
88+
89+
/**
90+
* Gets the key tab principal short name (e.g. "hdfs").
91+
*
92+
* @return The key tab principal.
93+
*/
94+
@Nullable public String getKeyTabPrincipal() {
95+
return keyTabPrincipal;
96+
}
97+
98+
/**
99+
* Set the key tab principal name. See {@link #getKeyTabPrincipal()} for more information.
100+
*
101+
* @param keyTabPrincipal The key tab principal name.
102+
*/
103+
public void setKeyTabPrincipal(@Nullable String keyTabPrincipal) {
104+
this.keyTabPrincipal = keyTabPrincipal;
105+
}
106+
107+
/**
108+
* Gets the key tab full file name (e.g. "/etc/security/keytabs/hdfs.headless.keytab" or "/etc/krb5.keytab").
109+
*
110+
* @return The key tab file name.
111+
*/
112+
@Nullable public String getKeyTab() {
113+
return keyTab;
114+
}
115+
116+
/**
117+
* Sets the key tab file name. See {@link #getKeyTab()} for more information.
118+
*
119+
* @param keyTab The key tab file name.
120+
*/
121+
public void setKeyTab(@Nullable String keyTab) {
122+
this.keyTab = keyTab;
123+
}
124+
125+
/**
126+
* The interval used to re-login from the key tab, in milliseconds.
127+
* Important that the value should not be larger than the Kerberos ticket life time multiplied by 0.2. This is
128+
* because the ticket renew window starts from {@code 0.8 * ticket life time}.
129+
* Default ticket life time is 1 day (24 hours), so the default re-login interval (10 min)
130+
* is obeys this rule well.
131+
*
132+
* <p>Zero value means that re-login should be attempted on each file system operation.
133+
* Negative values are not allowed.
134+
*
135+
* <p>Note, however, that it does not make sense to make this value small, because Hadoop does not allow to
136+
* login if less than {@link org.apache.hadoop.security.UserGroupInformation#MIN_TIME_BEFORE_RELOGIN} milliseconds
137+
* have passed since the time of the previous login.
138+
* See {@link org.apache.hadoop.security.UserGroupInformation#hasSufficientTimeElapsed(long)} and its usages for
139+
* more detail.
140+
*
141+
* @return The re-login interval, in milliseconds.
142+
*/
143+
public long getReloginInterval() {
144+
return reloginInterval;
145+
}
146+
147+
/**
148+
* Sets the relogin interval in milliseconds. See {@link #getReloginInterval()} for more information.
149+
*
150+
* @param reloginInterval The re-login interval, in milliseconds.
151+
*/
152+
public void setReloginInterval(long reloginInterval) {
153+
this.reloginInterval = reloginInterval;
154+
}
155+
156+
/** {@inheritDoc} */
157+
@Override public void start() throws IgniteException {
158+
A.ensure(!F.isEmpty(keyTab), "keyTab cannot not be empty.");
159+
A.ensure(!F.isEmpty(keyTabPrincipal), "keyTabPrincipal cannot not be empty.");
160+
A.ensure(reloginInterval >= 0, "reloginInterval cannot not be negative.");
161+
162+
super.start();
163+
164+
try {
165+
UserGroupInformation.setConfiguration(cfg);
166+
UserGroupInformation.loginUserFromKeytab(keyTabPrincipal, keyTab);
167+
}
168+
catch (IOException ioe) {
169+
throw new IgniteException("Failed login from keytab [keyTab=" + keyTab +
170+
", keyTabPrincipal=" + keyTabPrincipal + ']', ioe);
171+
}
172+
}
173+
174+
/**
175+
* Re-logins the user if needed.
176+
* First, the re-login interval defined in factory is checked. The re-login attempts will be not more
177+
* frequent than one attempt per {@code reloginInterval}.
178+
* Second, {@link UserGroupInformation#checkTGTAndReloginFromKeytab()} method invoked that gets existing
179+
* TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login.
180+
*
181+
* <p>This operation expected to be called upon each operation with the file system created with the factory.
182+
* As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there
183+
* is no need to invoke it otherwise specially.
184+
*
185+
* @throws IOException If login fails.
186+
*/
187+
private void reloginIfNeeded() throws IOException {
188+
long now = System.currentTimeMillis();
189+
190+
if (now >= lastReloginTime + reloginInterval) {
191+
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
192+
193+
lastReloginTime = now;
194+
}
195+
}
196+
197+
/** {@inheritDoc} */
198+
@Override public void writeExternal(ObjectOutput out) throws IOException {
199+
super.writeExternal(out);
200+
201+
U.writeString(out, keyTab);
202+
U.writeString(out, keyTabPrincipal);
203+
out.writeLong(reloginInterval);
204+
}
205+
206+
/** {@inheritDoc} */
207+
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
208+
super.readExternal(in);
209+
210+
keyTab = U.readString(in);
211+
keyTabPrincipal = U.readString(in);
212+
reloginInterval = in.readLong();
213+
}
214+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package org.apache.ignite.hadoop.fs;
2+
3+
import java.io.ByteArrayInputStream;
4+
import java.io.ByteArrayOutputStream;
5+
import java.io.ObjectInput;
6+
import java.io.ObjectInputStream;
7+
import java.io.ObjectOutput;
8+
import java.io.ObjectOutputStream;
9+
import java.util.concurrent.Callable;
10+
11+
import org.apache.ignite.testframework.GridTestUtils;
12+
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
13+
import org.junit.Assert;
14+
15+
/**
16+
* Tests KerberosHadoopFileSystemFactory.
17+
*/
18+
public class KerberosHadoopFileSystemFactorySelfTest extends GridCommonAbstractTest {
19+
/**
20+
* Test parameters validation.
21+
*
22+
* @throws Exception If failed.
23+
*/
24+
public void testParameters() throws Exception {
25+
checkParameters(null, null, -1);
26+
27+
checkParameters(null, null, 100);
28+
checkParameters(null, "b", -1);
29+
checkParameters("a", null, -1);
30+
31+
checkParameters(null, "b", 100);
32+
checkParameters("a", null, 100);
33+
checkParameters("a", "b", -1);
34+
}
35+
36+
/**
37+
* Check parameters.
38+
*
39+
* @param keyTab Key tab.
40+
* @param keyTabPrincipal Key tab principal.
41+
* @param reloginInterval Re-login interval.
42+
*/
43+
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
44+
private void checkParameters(String keyTab, String keyTabPrincipal, long reloginInterval) {
45+
final KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory();
46+
47+
fac.setKeyTab(keyTab);
48+
fac.setKeyTabPrincipal(keyTabPrincipal);
49+
fac.setReloginInterval(reloginInterval);
50+
51+
GridTestUtils.assertThrows(null, new Callable<Object>() {
52+
@Override public Object call() throws Exception {
53+
fac.start();
54+
55+
return null;
56+
}
57+
}, IllegalArgumentException.class, null);
58+
}
59+
60+
/**
61+
* Checks serializatuion and deserialization of the secure factory.
62+
*
63+
* @throws Exception If failed.
64+
*/
65+
public void testSerialization() throws Exception {
66+
KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory();
67+
68+
checkSerialization(fac);
69+
70+
fac = new KerberosHadoopFileSystemFactory();
71+
72+
fac.setUri("igfs://igfs@localhost:10500/");
73+
fac.setConfigPaths("/a/core-sute.xml", "/b/mapred-site.xml");
74+
fac.setKeyTabPrincipal("foo");
75+
fac.setKeyTab("/etc/krb5.keytab");
76+
fac.setReloginInterval(30 * 60 * 1000L);
77+
78+
checkSerialization(fac);
79+
}
80+
81+
/**
82+
* Serializes the factory,
83+
*
84+
* @param fac The facory to check.
85+
* @throws Exception If failed.
86+
*/
87+
private void checkSerialization(KerberosHadoopFileSystemFactory fac) throws Exception {
88+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
89+
90+
ObjectOutput oo = new ObjectOutputStream(baos);
91+
92+
oo.writeObject(fac);
93+
94+
ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
95+
96+
KerberosHadoopFileSystemFactory fac2 = (KerberosHadoopFileSystemFactory)in.readObject();
97+
98+
assertEquals(fac.getUri(), fac2.getUri());
99+
Assert.assertArrayEquals(fac.getConfigPaths(), fac2.getConfigPaths());
100+
assertEquals(fac.getKeyTab(), fac2.getKeyTab());
101+
assertEquals(fac.getKeyTabPrincipal(), fac2.getKeyTabPrincipal());
102+
assertEquals(fac.getReloginInterval(), fac2.getReloginInterval());
103+
}
104+
}

0 commit comments

Comments
 (0)