diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template index a9eccf6062a..7bc38d633e1 100644 --- a/conf/zeppelin-env.sh.template +++ b/conf/zeppelin-env.sh.template @@ -56,7 +56,7 @@ ## Kerberos ticket refresh setting ## #export KINIT_FAIL_THRESHOLD # (optional) How many times should kinit retry. The default value is 5. -#export LAUNCH_KERBEROS_REFRESH_INTERVAL # (optional) The refresh interval for Kerberos ticket. The default value is 1d. +#export KERBEROS_REFRESH_INTERVAL # (optional) The refresh interval for Kerberos ticket. The default value is 1d. ## Use provided spark installation ## ## defining SPARK_HOME makes Zeppelin run spark interpreter process using spark-submit diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 72d7981764d..948914ff190 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -50,6 +50,7 @@ import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.KerberosInterpreter; import org.apache.zeppelin.interpreter.ResultMessages; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.jdbc.security.JDBCSecurityImpl; @@ -89,7 +90,7 @@ * } *

*/ -public class JDBCInterpreter extends Interpreter { +public class JDBCInterpreter extends KerberosInterpreter { private Logger logger = LoggerFactory.getLogger(JDBCInterpreter.class); @@ -147,12 +148,29 @@ public JDBCInterpreter(Properties property) { maxLineResults = MAX_LINE_DEFAULT; } + @Override + protected boolean runKerberosLogin() { + try { + if (UserGroupInformation.isLoginKeytabBased()) { + UserGroupInformation.getLoginUser().reloginFromKeytab(); + return true; + } else if (UserGroupInformation.isLoginTicketBased()) { + UserGroupInformation.getLoginUser().reloginFromTicketCache(); + return true; + } + } catch (Exception e) { + logger.error("Unable to run kinit for zeppelin", e); + } + return false; + } + public HashMap getPropertiesMap() { return basePropretiesMap; } @Override public void open() { + super.open(); for (String propertyKey : property.stringPropertyNames()) { logger.debug("propertyKey: {}", propertyKey); String[] keyValue = propertyKey.split("\\.", 2); @@ -190,6 +208,16 @@ public void open() { setMaxLineResults(); } + + protected boolean isKerboseEnabled() { + UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property); + if (authType.equals(KERBEROS)) { + return true; + } + return false; + } + + private void setMaxLineResults() { if (basePropretiesMap.containsKey(COMMON_KEY) && basePropretiesMap.get(COMMON_KEY).containsKey(MAX_LINE_KEY)) { @@ -259,6 +287,7 @@ private void initConnectionPoolMap() { @Override public void close() { + super.close(); try { initStatementMap(); initConnectionPoolMap(); @@ -709,49 +738,17 @@ private InterpreterResult executeSql(String propertyKey, String sql, } getJDBCConfiguration(user).removeStatement(paragraphId); } catch (Throwable e) { - if (e.getCause() instanceof TTransportException && - Throwables.getStackTraceAsString(e).contains("GSS") && - getJDBCConfiguration(user).isConnectionInDBDriverPoolSuccessful(propertyKey)) { - return reLoginFromKeytab(propertyKey, sql, interpreterContext, interpreterResult); - } else { - logger.error("Cannot run " + sql, e); - String errorMsg = Throwables.getStackTraceAsString(e); - try { - closeDBPool(user, propertyKey); - } catch (SQLException e1) { - logger.error("Cannot close DBPool for user, propertyKey: " + user + propertyKey, e1); - } - interpreterResult.add(errorMsg); - return new InterpreterResult(Code.ERROR, interpreterResult.message()); - } - } - return interpreterResult; - } - - private InterpreterResult reLoginFromKeytab(String propertyKey, String sql, - InterpreterContext interpreterContext, InterpreterResult interpreterResult) { - String user = interpreterContext.getAuthenticationInfo().getUser(); - try { - closeDBPool(user, propertyKey); - } catch (SQLException e) { - logger.error("Error, could not close DB pool in reLoginFromKeytab ", e); - } - UserGroupInformation.AuthenticationMethod authType = - JDBCSecurityImpl.getAuthtype(property); - if (authType.equals(KERBEROS)) { + logger.error("Cannot run " + sql, e); + String errorMsg = Throwables.getStackTraceAsString(e); try { - if (UserGroupInformation.isLoginKeytabBased()) { - UserGroupInformation.getLoginUser().reloginFromKeytab(); - } else if (UserGroupInformation.isLoginTicketBased()) { - UserGroupInformation.getLoginUser().reloginFromTicketCache(); - } - } catch (IOException e) { - logger.error("Cannot reloginFromKeytab " + sql, e); - interpreterResult.add(e.getMessage()); - return new InterpreterResult(Code.ERROR, interpreterResult.message()); + closeDBPool(user, propertyKey); + } catch (SQLException e1) { + logger.error("Cannot close DBPool for user, propertyKey: " + user + propertyKey, e1); } + interpreterResult.add(errorMsg); + return new InterpreterResult(Code.ERROR, interpreterResult.message()); } - return executeSql(propertyKey, sql, interpreterContext); + return interpreterResult; } /** diff --git a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java index 79fc3a35bed..07eed5f9ef1 100644 --- a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java +++ b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java @@ -31,13 +31,13 @@ import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.KerberosInterpreter; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; -import org.apache.zeppelin.shell.security.ShellSecurityImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,17 +57,14 @@ public ShellInterpreter(Properties property) { @Override public void open() { + super.open(); LOGGER.info("Command timeout property: {}", getProperty(TIMEOUT_PROPERTY)); executors = new ConcurrentHashMap<>(); - if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type"))) { - startKerberosLoginThread(); - } } @Override public void close() { - shutdownExecutorService(); - + super.close(); for (String executorKey : executors.keySet()) { DefaultExecutor executor = executors.remove(executorKey); if (executor != null) { @@ -163,12 +160,38 @@ public List completion(String buf, int cursor, @Override protected boolean runKerberosLogin() { try { - ShellSecurityImpl.createSecureConfiguration(getProperty(), shell); + createSecureConfiguration(); + return true; } catch (Exception e) { LOGGER.error("Unable to run kinit for zeppelin", e); - return false; } - return true; + return false; + } + + public void createSecureConfiguration() { + Properties properties = getProperty(); + CommandLine cmdLine = CommandLine.parse(shell); + cmdLine.addArgument("-c", false); + String kinitCommand = String.format("kinit -k -t %s %s", + properties.getProperty("zeppelin.shell.keytab.location"), + properties.getProperty("zeppelin.shell.principal")); + cmdLine.addArgument(kinitCommand, false); + DefaultExecutor executor = new DefaultExecutor(); + try { + executor.execute(cmdLine); + } catch (Exception e) { + LOGGER.error("Unable to run kinit for zeppelin user " + kinitCommand, e); + throw new InterpreterException(e); + } + } + + @Override + protected boolean isKerboseEnabled() { + if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type")) && getProperty( + "zeppelin.shell.auth.type").equalsIgnoreCase("kerberos")) { + return true; + } + return false; } } diff --git a/shell/src/main/java/org/apache/zeppelin/shell/security/ShellSecurityImpl.java b/shell/src/main/java/org/apache/zeppelin/shell/security/ShellSecurityImpl.java deleted file mode 100644 index ecfdb0c28d8..00000000000 --- a/shell/src/main/java/org/apache/zeppelin/shell/security/ShellSecurityImpl.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.shell.security; - -import org.apache.commons.exec.CommandLine; -import org.apache.commons.exec.DefaultExecutor; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Properties; - - -/*** - * Shell security helper - */ -public class ShellSecurityImpl { - - private static Logger LOGGER = LoggerFactory.getLogger(ShellSecurityImpl.class); - - public static void createSecureConfiguration(Properties properties, String shell) { - - String authType = properties.getProperty("zeppelin.shell.auth.type") - .trim().toUpperCase(); - - switch (authType) { - case "KERBEROS": - CommandLine cmdLine = CommandLine.parse(shell); - cmdLine.addArgument("-c", false); - String kinitCommand = String.format("kinit -k -t %s %s", - properties.getProperty("zeppelin.shell.keytab.location"), - properties.getProperty("zeppelin.shell.principal")); - cmdLine.addArgument(kinitCommand, false); - DefaultExecutor executor = new DefaultExecutor(); - - try { - int exitVal = executor.execute(cmdLine); - } catch (Exception e) { - LOGGER.error("Unable to run kinit for zeppelin user " + kinitCommand, e); - throw new InterpreterException(e); - } - } - } -} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java index 4673e480738..4da5ef575c7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java @@ -31,15 +31,24 @@ /** * Interpreter wrapper for Kerberos initialization * - * runKerberosLogin() method you need to implement that determine Zeppelin's behavior. + * runKerberosLogin() method you need to implement that determine how should this interpeter do a + * kinit for this interpreter. + * isKerboseEnabled() method needs to implement which determines if the kerberos is enabled for that + * interpreter. * startKerberosLoginThread() needs to be called inside the open() and * shutdownExecutorService() inside close(). + * + * + * Environment variables defined in zeppelin-env.sh + * KERBEROS_REFRESH_INTERVAL controls the refresh interval for Kerberos ticket. The default value + * is 1d. + * KINIT_FAIL_THRESHOLD controls how many times should kinit retry. The default value is 5. */ public abstract class KerberosInterpreter extends Interpreter { private Integer kinitFailCount = 0; - protected ScheduledExecutorService scheduledExecutorService; - public static Logger logger = LoggerFactory.getLogger(KerberosInterpreter.class); + private ScheduledExecutorService scheduledExecutorService; + private static Logger logger = LoggerFactory.getLogger(KerberosInterpreter.class); public KerberosInterpreter(Properties property) { super(property); @@ -48,23 +57,54 @@ public KerberosInterpreter(Properties property) { @ZeppelinApi protected abstract boolean runKerberosLogin(); - public String getKerberosRefreshInterval() { - if (System.getenv("KERBEROS_REFRESH_INTERVAL") == null) { - return "1d"; - } else { - return System.getenv("KERBEROS_REFRESH_INTERVAL"); + @ZeppelinApi + protected abstract boolean isKerboseEnabled(); + + public void open() { + if (isKerboseEnabled()) { + startKerberosLoginThread(); + } + } + + public void close() { + if (isKerboseEnabled()) { + shutdownExecutorService(); } } - public Integer kinitFailThreshold() { - if (System.getenv("KINIT_FAIL_THRESHOLD") == null) { - return 5; - } else { - return new Integer(System.getenv("KINIT_FAIL_THRESHOLD")); + private Long getKerberosRefreshInterval() { + Long refreshInterval; + String refreshIntervalString = "1d"; + //defined in zeppelin-env.sh, if not initialized then the default value is one day. + if (System.getenv("KERBEROS_REFRESH_INTERVAL") != null) { + refreshIntervalString = System.getenv("KERBEROS_REFRESH_INTERVAL"); + } + try { + refreshInterval = getTimeAsMs(refreshIntervalString); + } catch (IllegalArgumentException e) { + logger.error("Cannot get time in MS for the given string, " + refreshIntervalString + + " defaulting to 1d ", e); + refreshInterval = getTimeAsMs("1d"); + } + + return refreshInterval; + } + + private Integer kinitFailThreshold() { + Integer kinitFailThreshold = 5; + //defined in zeppelin-env.sh, if not initialized then the default value is 5. + if (System.getenv("KINIT_FAIL_THRESHOLD") != null) { + try { + kinitFailThreshold = new Integer(System.getenv("KINIT_FAIL_THRESHOLD")); + } catch (Exception e) { + logger.error("Cannot get integer value from the given string, " + System + .getenv("KINIT_FAIL_THRESHOLD") + " defaulting to " + kinitFailThreshold, e); + } } + return kinitFailThreshold; } - public Long getTimeAsMs(String time) { + private Long getTimeAsMs(String time) { if (time == null) { logger.error("Cannot convert to time value.", time); time = "1d"; @@ -86,10 +126,10 @@ public Long getTimeAsMs(String time) { suffix != null ? Constants.TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS); } - protected ScheduledExecutorService startKerberosLoginThread() { + private ScheduledExecutorService startKerberosLoginThread() { scheduledExecutorService = Executors.newScheduledThreadPool(1); - scheduledExecutorService.schedule(new Callable() { + scheduledExecutorService.submit(new Callable() { public Object call() throws Exception { if (runKerberosLogin()) { @@ -97,7 +137,7 @@ public Object call() throws Exception { kinitFailCount = 0; // schedule another kinit run with a fixed delay. scheduledExecutorService - .schedule(this, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS); + .schedule(this, getKerberosRefreshInterval(), TimeUnit.MILLISECONDS); } else { kinitFailCount++; logger.info("runKerberosLogin failed for " + kinitFailCount + " time(s)."); @@ -111,12 +151,12 @@ public Object call() throws Exception { } return null; } - }, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS); + }); return scheduledExecutorService; } - protected void shutdownExecutorService() { + private void shutdownExecutorService() { if (scheduledExecutorService != null) { scheduledExecutorService.shutdown(); }