Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.zeppelin.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.ProtocolOptions.Compression;
import com.datastax.driver.core.Session;
import org.apache.zeppelin.interpreter.Interpreter;
Expand All @@ -29,6 +30,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -106,6 +113,12 @@ public class CassandraInterpreter extends Interpreter {
"cassandra.socket.soLinger";
public static final String CASSANDRA_SOCKET_TCP_NO_DELAY =
"cassandra.socket.tcp.no_delay";
public static final String CASSANDRA_WITH_SSL =
"cassandra.ssl.enabled";
public static final String CASSANDRA_TRUSTSTORE_PATH =
"cassandra.ssl.truststore.path";
public static final String CASSANDRA_TRUSTSTORE_PASSWORD =
"cassandra.ssl.truststore.password";

public static final String DEFAULT_HOST = "localhost";
public static final String DEFAULT_PORT = "9042";
Expand Down Expand Up @@ -139,10 +152,14 @@ public class CassandraInterpreter extends Interpreter {
public static final String LOGGING_DEFAULT_RETRY = "LOGGING_DEFAULT";
public static final String LOGGING_DOWNGRADING_RETRY = "LOGGING_DOWNGRADING";
public static final String LOGGING_FALLTHROUGH_RETRY = "LOGGING_FALLTHROUGH";
public static final String DEFAULT_WITH_SSL = "false";
public static final String DEFAULT_TRUSTSTORE_PATH = "none";
public static final String DEFAULT_TRUSTSTORE_PASSWORD = "none";

public static final List NO_COMPLETION = new ArrayList<>();

InterpreterLogic helper;
Cluster.Builder clusterBuilder;
Cluster cluster;
Session session;
private JavaDriverConfig driverConfig = new JavaDriverConfig();
Expand Down Expand Up @@ -253,6 +270,12 @@ public CassandraInterpreter(Properties properties) {
"Cassandra socket read timeout in millisecs. Default = 12000")
.add(CASSANDRA_SOCKET_TCP_NO_DELAY, DEFAULT_TCP_NO_DELAY,
"Cassandra socket TCP no delay. Default = true")
.add(CASSANDRA_WITH_SSL, DEFAULT_WITH_SSL,
"Cassandra SSL")
.add(CASSANDRA_TRUSTSTORE_PATH, DEFAULT_TRUSTSTORE_PATH,
"Cassandra truststore path. Default = none")
.add(CASSANDRA_TRUSTSTORE_PASSWORD, DEFAULT_TRUSTSTORE_PASSWORD,
"Cassandra truststore password. Default = none")
.build());
}

Expand All @@ -271,7 +294,7 @@ public void open() {

Compression compression = driverConfig.getCompressionProtocol(this);

cluster = Cluster.builder()
clusterBuilder = Cluster.builder()
.addContactPoints(addresses)
.withPort(port)
.withProtocolVersion(driverConfig.getProtocolVersion(this))
Expand All @@ -287,8 +310,36 @@ public void open() {
parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS)))
.withPoolingOptions(driverConfig.getPoolingOptions(this))
.withQueryOptions(driverConfig.getQueryOptions(this))
.withSocketOptions(driverConfig.getSocketOptions(this))
.build();
.withSocketOptions(driverConfig.getSocketOptions(this));

if (getProperty(CASSANDRA_WITH_SSL).equals("true")) {
LOGGER.debug("Cassandra Interpreter: Using SSL");

try {
final SSLContext sslContext;
{
final KeyStore trustStore = KeyStore.getInstance("JKS");
final InputStream stream = Files.newInputStream(Paths.get(
getProperty(CASSANDRA_TRUSTSTORE_PATH)));
trustStore.load(stream, getProperty(CASSANDRA_TRUSTSTORE_PASSWORD).toCharArray());

final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);

sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
}
clusterBuilder = clusterBuilder.withSSL(JdkSSLOptions.builder().withSSLContext(
sslContext).build());
} catch (Exception e) {
LOGGER.error(e.toString());
}
} else {
LOGGER.debug("Cassandra Interpreter: Not using SSL");
}

cluster = clusterBuilder.build();

session = cluster.connect();
helper = new InterpreterLogic(session);
Expand Down