diff --git a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java index 147fb62c735..80c3c838fce 100644 --- a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java +++ b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.cassandra; import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.JdkSSLOptions; import com.datastax.driver.core.ProtocolOptions.Compression; import com.datastax.driver.core.Session; import org.apache.zeppelin.interpreter.Interpreter; @@ -29,6 +30,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.KeyStore; import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -106,6 +113,12 @@ public class CassandraInterpreter extends Interpreter { "cassandra.socket.soLinger"; public static final String CASSANDRA_SOCKET_TCP_NO_DELAY = "cassandra.socket.tcp.no_delay"; + public static final String CASSANDRA_WITH_SSL = + "cassandra.ssl.enabled"; + public static final String CASSANDRA_TRUSTSTORE_PATH = + "cassandra.ssl.truststore.path"; + public static final String CASSANDRA_TRUSTSTORE_PASSWORD = + "cassandra.ssl.truststore.password"; public static final String DEFAULT_HOST = "localhost"; public static final String DEFAULT_PORT = "9042"; @@ -139,10 +152,14 @@ public class CassandraInterpreter extends Interpreter { public static final String LOGGING_DEFAULT_RETRY = "LOGGING_DEFAULT"; public static final String LOGGING_DOWNGRADING_RETRY = "LOGGING_DOWNGRADING"; public static final String LOGGING_FALLTHROUGH_RETRY = "LOGGING_FALLTHROUGH"; + public static final String DEFAULT_WITH_SSL = "false"; + public static final String DEFAULT_TRUSTSTORE_PATH = "none"; + public static final String DEFAULT_TRUSTSTORE_PASSWORD = "none"; public static final List NO_COMPLETION = new ArrayList<>(); InterpreterLogic helper; + Cluster.Builder clusterBuilder; Cluster cluster; Session session; private JavaDriverConfig driverConfig = new JavaDriverConfig(); @@ -253,6 +270,12 @@ public CassandraInterpreter(Properties properties) { "Cassandra socket read timeout in millisecs. Default = 12000") .add(CASSANDRA_SOCKET_TCP_NO_DELAY, DEFAULT_TCP_NO_DELAY, "Cassandra socket TCP no delay. Default = true") + .add(CASSANDRA_WITH_SSL, DEFAULT_WITH_SSL, + "Cassandra SSL") + .add(CASSANDRA_TRUSTSTORE_PATH, DEFAULT_TRUSTSTORE_PATH, + "Cassandra truststore path. Default = none") + .add(CASSANDRA_TRUSTSTORE_PASSWORD, DEFAULT_TRUSTSTORE_PASSWORD, + "Cassandra truststore password. Default = none") .build()); } @@ -271,7 +294,7 @@ public void open() { Compression compression = driverConfig.getCompressionProtocol(this); - cluster = Cluster.builder() + clusterBuilder = Cluster.builder() .addContactPoints(addresses) .withPort(port) .withProtocolVersion(driverConfig.getProtocolVersion(this)) @@ -287,8 +310,36 @@ public void open() { parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS))) .withPoolingOptions(driverConfig.getPoolingOptions(this)) .withQueryOptions(driverConfig.getQueryOptions(this)) - .withSocketOptions(driverConfig.getSocketOptions(this)) - .build(); + .withSocketOptions(driverConfig.getSocketOptions(this)); + + if (getProperty(CASSANDRA_WITH_SSL).equals("true")) { + LOGGER.debug("Cassandra Interpreter: Using SSL"); + + try { + final SSLContext sslContext; + { + final KeyStore trustStore = KeyStore.getInstance("JKS"); + final InputStream stream = Files.newInputStream(Paths.get( + getProperty(CASSANDRA_TRUSTSTORE_PATH))); + trustStore.load(stream, getProperty(CASSANDRA_TRUSTSTORE_PASSWORD).toCharArray()); + + final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(trustStore); + + sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, trustManagerFactory.getTrustManagers(), null); + } + clusterBuilder = clusterBuilder.withSSL(JdkSSLOptions.builder().withSSLContext( + sslContext).build()); + } catch (Exception e) { + LOGGER.error(e.toString()); + } + } else { + LOGGER.debug("Cassandra Interpreter: Not using SSL"); + } + + cluster = clusterBuilder.build(); session = cluster.connect(); helper = new InterpreterLogic(session);