diff --git a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java index a4984ad3032..166bb121ab3 100644 --- a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java +++ b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java @@ -17,11 +17,11 @@ package org.apache.zeppelin.cassandra; import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.JdkSSLOptions; import com.datastax.driver.core.ProtocolOptions.Compression; import com.datastax.driver.core.Session; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.scheduler.Scheduler; @@ -29,11 +29,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.KeyStore; import java.util.ArrayList; import java.util.List; import java.util.Properties; -import static com.datastax.driver.core.ProtocolOptions.DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS; import static java.lang.Integer.parseInt; /** @@ -43,8 +48,8 @@ public class CassandraInterpreter extends Interpreter { private static final Logger LOGGER = LoggerFactory.getLogger(CassandraInterpreter.class); - public static final String CASSANDRA_INTERPRETER_PARALLELISM = "cassandra.interpreter" + - ".parallelism"; + public static final String CASSANDRA_INTERPRETER_PARALLELISM = + "cassandra.interpreter.parallelism"; public static final String CASSANDRA_HOSTS = "cassandra.hosts"; public static final String CASSANDRA_PORT = "cassandra.native.port"; public static final String CASSANDRA_PROTOCOL_VERSION = "cassandra.protocol.version"; @@ -59,21 +64,21 @@ public class CassandraInterpreter extends Interpreter { public static final String CASSANDRA_SPECULATIVE_EXECUTION_POLICY = "cassandra.speculative.execution.policy"; public static final String CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS = - "cassandra.max.schema.agreement.wait.second"; + "cassandra.max.schema.agreement.wait.second"; public static final String CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL = - "cassandra.pooling.new.connection.threshold.local"; + "cassandra.pooling.new.connection.threshold.local"; public static final String CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE = - "cassandra.pooling.new.connection.threshold.remote"; + "cassandra.pooling.new.connection.threshold.remote"; public static final String CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL = - "cassandra.pooling.max.connection.per.host.local"; + "cassandra.pooling.max.connection.per.host.local"; public static final String CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE = - "cassandra.pooling.max.connection.per.host.remote"; + "cassandra.pooling.max.connection.per.host.remote"; public static final String CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL = "cassandra.pooling.core.connection.per.host.local"; public static final String CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE = "cassandra.pooling.core.connection.per.host.remote"; public static final String CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL = - "cassandra.pooling.max.request.per.connection.local"; + "cassandra.pooling.max.request.per.connection.local"; public static final String CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE = "cassandra.pooling.max.request.per.connection.remote"; public static final String CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS = @@ -106,6 +111,13 @@ public class CassandraInterpreter extends Interpreter { "cassandra.socket.soLinger"; public static final String CASSANDRA_SOCKET_TCP_NO_DELAY = "cassandra.socket.tcp.no_delay"; + public static final String CASSANDRA_WITH_SSL = + "cassandra.ssl.enabled"; + public static final String CASSANDRA_TRUSTSTORE_PATH = + "cassandra.ssl.truststore.path"; + public static final String CASSANDRA_TRUSTSTORE_PASSWORD = + "cassandra.ssl.truststore.password"; + public static final String DEFAULT_HOST = "localhost"; public static final String DEFAULT_PORT = "9042"; @@ -143,6 +155,7 @@ public class CassandraInterpreter extends Interpreter { public static final List NO_COMPLETION = new ArrayList<>(); InterpreterLogic helper; + Cluster.Builder clusterBuilder; Cluster cluster; Session session; private JavaDriverConfig driverConfig = new JavaDriverConfig(); @@ -162,29 +175,58 @@ public void open() { } LOGGER.info("Bootstrapping Cassandra Java Driver to connect to " + hosts.toString() + - "on port " + port); + "on port " + port); Compression compression = driverConfig.getCompressionProtocol(this); - cluster = Cluster.builder() - .addContactPoints(addresses) - .withPort(port) - .withProtocolVersion(driverConfig.getProtocolVersion(this)) - .withClusterName(getProperty(CASSANDRA_CLUSTER_NAME)) - .withCompression(compression) - .withCredentials(getProperty(CASSANDRA_CREDENTIALS_USERNAME), - getProperty(CASSANDRA_CREDENTIALS_PASSWORD)) - .withLoadBalancingPolicy(driverConfig.getLoadBalancingPolicy(this)) - .withRetryPolicy(driverConfig.getRetryPolicy(this)) - .withReconnectionPolicy(driverConfig.getReconnectionPolicy(this)) - .withSpeculativeExecutionPolicy(driverConfig.getSpeculativeExecutionPolicy(this)) - .withMaxSchemaAgreementWaitSeconds( - parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS))) - .withPoolingOptions(driverConfig.getPoolingOptions(this)) - .withQueryOptions(driverConfig.getQueryOptions(this)) - .withSocketOptions(driverConfig.getSocketOptions(this)) - .build(); + clusterBuilder = Cluster.builder() + .addContactPoints(addresses) + .withPort(port) + .withProtocolVersion(driverConfig.getProtocolVersion(this)) + .withClusterName(getProperty(CASSANDRA_CLUSTER_NAME)) + .withCompression(compression) + .withCredentials(getProperty(CASSANDRA_CREDENTIALS_USERNAME), + getProperty(CASSANDRA_CREDENTIALS_PASSWORD)) + .withLoadBalancingPolicy(driverConfig.getLoadBalancingPolicy(this)) + .withRetryPolicy(driverConfig.getRetryPolicy(this)) + .withReconnectionPolicy(driverConfig.getReconnectionPolicy(this)) + .withSpeculativeExecutionPolicy(driverConfig.getSpeculativeExecutionPolicy(this)) + .withMaxSchemaAgreementWaitSeconds( + parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS))) + .withPoolingOptions(driverConfig.getPoolingOptions(this)) + .withQueryOptions(driverConfig.getQueryOptions(this)) + .withSocketOptions(driverConfig.getSocketOptions(this)); + + final String runWithSSL = getProperty(CASSANDRA_WITH_SSL); + if (runWithSSL != null && runWithSSL.equals("true")) { + LOGGER.debug("Cassandra Interpreter: Using SSL"); + + try { + final SSLContext sslContext; + { + final KeyStore trustStore = KeyStore.getInstance("JKS"); + final InputStream stream = Files.newInputStream(Paths.get( + getProperty(CASSANDRA_TRUSTSTORE_PATH))); + trustStore.load(stream, getProperty(CASSANDRA_TRUSTSTORE_PASSWORD).toCharArray()); + + final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(trustStore); + + sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, trustManagerFactory.getTrustManagers(), null); + } + clusterBuilder = clusterBuilder.withSSL(JdkSSLOptions.builder() + .withSSLContext(sslContext) + .build()); + } catch (Exception e) { + LOGGER.error(e.toString()); + } + } else { + LOGGER.debug("Cassandra Interpreter: Not using SSL"); + } + cluster = clusterBuilder.build(); session = cluster.connect(); helper = new InterpreterLogic(session); } diff --git a/cassandra/src/main/resources/interpreter-setting.json b/cassandra/src/main/resources/interpreter-setting.json index 3df120d985c..c878b92826f 100644 --- a/cassandra/src/main/resources/interpreter-setting.json +++ b/cassandra/src/main/resources/interpreter-setting.json @@ -189,6 +189,24 @@ "propertyName": "cassandra.socket.tcp.no_delay", "defaultValue": "true", "description": "Cassandra socket TCP no delay. Default = true" + }, + "cassandra.ssl.enabled": { + "envName": null, + "propertyName": "cassandra.ssl.enabled", + "defaultValue": "false", + "description": "Cassandra SSL" + }, + "cassandra.ssl.truststore.path": { + "envName": null, + "propertyName": "cassandra.ssl.truststore.path", + "defaultValue": "none", + "description": "Cassandra truststore path. Default = none" + }, + "cassandra.ssl.truststore.password": { + "envName": null, + "propertyName": "cassandra.ssl.truststore.password", + "defaultValue": "none", + "description": "Cassandra truststore password. Default = none" } }, "editor": {