Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/zeppelin-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
## Kerberos ticket refresh setting
##
#export KINIT_FAIL_THRESHOLD # (optional) How many times should kinit retry. The default value is 5.
#export LAUNCH_KERBEROS_REFRESH_INTERVAL # (optional) The refresh interval for Kerberos ticket. The default value is 1d.
#export KERBEROS_REFRESH_INTERVAL # (optional) The refresh interval for Kerberos ticket. The default value is 1d.

## Use provided spark installation ##
## defining SPARK_HOME makes Zeppelin run spark interpreter process using spark-submit
Expand Down
79 changes: 38 additions & 41 deletions jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.KerberosInterpreter;
import org.apache.zeppelin.interpreter.ResultMessages;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.jdbc.security.JDBCSecurityImpl;
Expand Down Expand Up @@ -89,7 +90,7 @@
* }
* </p>
*/
public class JDBCInterpreter extends Interpreter {
public class JDBCInterpreter extends KerberosInterpreter {

private Logger logger = LoggerFactory.getLogger(JDBCInterpreter.class);

Expand Down Expand Up @@ -147,12 +148,29 @@ public JDBCInterpreter(Properties property) {
maxLineResults = MAX_LINE_DEFAULT;
}

@Override
protected boolean runKerberosLogin() {
try {
if (UserGroupInformation.isLoginKeytabBased()) {
UserGroupInformation.getLoginUser().reloginFromKeytab();
return true;
} else if (UserGroupInformation.isLoginTicketBased()) {
UserGroupInformation.getLoginUser().reloginFromTicketCache();
return true;
}
} catch (Exception e) {
logger.error("Unable to run kinit for zeppelin", e);
}
return false;
}

public HashMap<String, Properties> getPropertiesMap() {
return basePropretiesMap;
}

@Override
public void open() {
super.open();
for (String propertyKey : property.stringPropertyNames()) {
logger.debug("propertyKey: {}", propertyKey);
String[] keyValue = propertyKey.split("\\.", 2);
Expand Down Expand Up @@ -190,6 +208,16 @@ public void open() {
setMaxLineResults();
}


protected boolean isKerboseEnabled() {
UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property);
if (authType.equals(KERBEROS)) {
return true;
}
return false;
}


private void setMaxLineResults() {
if (basePropretiesMap.containsKey(COMMON_KEY) &&
basePropretiesMap.get(COMMON_KEY).containsKey(MAX_LINE_KEY)) {
Expand Down Expand Up @@ -259,6 +287,7 @@ private void initConnectionPoolMap() {

@Override
public void close() {
super.close();
try {
initStatementMap();
initConnectionPoolMap();
Expand Down Expand Up @@ -709,49 +738,17 @@ private InterpreterResult executeSql(String propertyKey, String sql,
}
getJDBCConfiguration(user).removeStatement(paragraphId);
} catch (Throwable e) {
if (e.getCause() instanceof TTransportException &&
Throwables.getStackTraceAsString(e).contains("GSS") &&
getJDBCConfiguration(user).isConnectionInDBDriverPoolSuccessful(propertyKey)) {
return reLoginFromKeytab(propertyKey, sql, interpreterContext, interpreterResult);
} else {
logger.error("Cannot run " + sql, e);
String errorMsg = Throwables.getStackTraceAsString(e);
try {
closeDBPool(user, propertyKey);
} catch (SQLException e1) {
logger.error("Cannot close DBPool for user, propertyKey: " + user + propertyKey, e1);
}
interpreterResult.add(errorMsg);
return new InterpreterResult(Code.ERROR, interpreterResult.message());
}
}
return interpreterResult;
}

private InterpreterResult reLoginFromKeytab(String propertyKey, String sql,
InterpreterContext interpreterContext, InterpreterResult interpreterResult) {
String user = interpreterContext.getAuthenticationInfo().getUser();
try {
closeDBPool(user, propertyKey);
} catch (SQLException e) {
logger.error("Error, could not close DB pool in reLoginFromKeytab ", e);
}
UserGroupInformation.AuthenticationMethod authType =
JDBCSecurityImpl.getAuthtype(property);
if (authType.equals(KERBEROS)) {
logger.error("Cannot run " + sql, e);
String errorMsg = Throwables.getStackTraceAsString(e);
try {
if (UserGroupInformation.isLoginKeytabBased()) {
UserGroupInformation.getLoginUser().reloginFromKeytab();
} else if (UserGroupInformation.isLoginTicketBased()) {
UserGroupInformation.getLoginUser().reloginFromTicketCache();
}
} catch (IOException e) {
logger.error("Cannot reloginFromKeytab " + sql, e);
interpreterResult.add(e.getMessage());
return new InterpreterResult(Code.ERROR, interpreterResult.message());
closeDBPool(user, propertyKey);
} catch (SQLException e1) {
logger.error("Cannot close DBPool for user, propertyKey: " + user + propertyKey, e1);
}
interpreterResult.add(errorMsg);
return new InterpreterResult(Code.ERROR, interpreterResult.message());
}
return executeSql(propertyKey, sql, interpreterContext);
return interpreterResult;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.KerberosInterpreter;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.shell.security.ShellSecurityImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -57,17 +57,14 @@ public ShellInterpreter(Properties property) {

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move createSecureConfiguration from ShellSecurityImpl to ShellInterpreter, then ShellSecurityImpl could be deleted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done.

public void open() {
super.open();
LOGGER.info("Command timeout property: {}", getProperty(TIMEOUT_PROPERTY));
executors = new ConcurrentHashMap<>();
if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type"))) {
startKerberosLoginThread();
}
}

@Override
public void close() {
shutdownExecutorService();

super.close();
for (String executorKey : executors.keySet()) {
DefaultExecutor executor = executors.remove(executorKey);
if (executor != null) {
Expand Down Expand Up @@ -163,12 +160,38 @@ public List<InterpreterCompletion> completion(String buf, int cursor,
@Override
protected boolean runKerberosLogin() {
try {
ShellSecurityImpl.createSecureConfiguration(getProperty(), shell);
createSecureConfiguration();
return true;
} catch (Exception e) {
LOGGER.error("Unable to run kinit for zeppelin", e);
return false;
}
return true;
return false;
}

public void createSecureConfiguration() {
Properties properties = getProperty();
CommandLine cmdLine = CommandLine.parse(shell);
cmdLine.addArgument("-c", false);
String kinitCommand = String.format("kinit -k -t %s %s",
properties.getProperty("zeppelin.shell.keytab.location"),
properties.getProperty("zeppelin.shell.principal"));
cmdLine.addArgument(kinitCommand, false);
DefaultExecutor executor = new DefaultExecutor();
try {
executor.execute(cmdLine);
} catch (Exception e) {
LOGGER.error("Unable to run kinit for zeppelin user " + kinitCommand, e);
throw new InterpreterException(e);
}
}

@Override
protected boolean isKerboseEnabled() {
if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type")) && getProperty(
"zeppelin.shell.auth.type").equalsIgnoreCase("kerberos")) {
return true;
}
return false;
}

}

This file was deleted.

Loading