Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,28 @@
package org.apache.zeppelin.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.ProtocolOptions.Compression;
import com.datastax.driver.core.Session;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import static com.datastax.driver.core.ProtocolOptions.DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS;
import static java.lang.Integer.parseInt;

/**
Expand All @@ -43,8 +48,8 @@ public class CassandraInterpreter extends Interpreter {

private static final Logger LOGGER = LoggerFactory.getLogger(CassandraInterpreter.class);

public static final String CASSANDRA_INTERPRETER_PARALLELISM = "cassandra.interpreter" +
".parallelism";
public static final String CASSANDRA_INTERPRETER_PARALLELISM =
"cassandra.interpreter.parallelism";
public static final String CASSANDRA_HOSTS = "cassandra.hosts";
public static final String CASSANDRA_PORT = "cassandra.native.port";
public static final String CASSANDRA_PROTOCOL_VERSION = "cassandra.protocol.version";
Expand All @@ -59,21 +64,21 @@ public class CassandraInterpreter extends Interpreter {
public static final String CASSANDRA_SPECULATIVE_EXECUTION_POLICY =
"cassandra.speculative.execution.policy";
public static final String CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS =
"cassandra.max.schema.agreement.wait.second";
"cassandra.max.schema.agreement.wait.second";
public static final String CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL =
"cassandra.pooling.new.connection.threshold.local";
"cassandra.pooling.new.connection.threshold.local";
public static final String CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE =
"cassandra.pooling.new.connection.threshold.remote";
"cassandra.pooling.new.connection.threshold.remote";
public static final String CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL =
"cassandra.pooling.max.connection.per.host.local";
"cassandra.pooling.max.connection.per.host.local";
public static final String CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE =
"cassandra.pooling.max.connection.per.host.remote";
"cassandra.pooling.max.connection.per.host.remote";
public static final String CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL =
"cassandra.pooling.core.connection.per.host.local";
public static final String CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE =
"cassandra.pooling.core.connection.per.host.remote";
public static final String CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL =
"cassandra.pooling.max.request.per.connection.local";
"cassandra.pooling.max.request.per.connection.local";
public static final String CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE =
"cassandra.pooling.max.request.per.connection.remote";
public static final String CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS =
Expand Down Expand Up @@ -106,6 +111,13 @@ public class CassandraInterpreter extends Interpreter {
"cassandra.socket.soLinger";
public static final String CASSANDRA_SOCKET_TCP_NO_DELAY =
"cassandra.socket.tcp.no_delay";
public static final String CASSANDRA_WITH_SSL =
"cassandra.ssl.enabled";
public static final String CASSANDRA_TRUSTSTORE_PATH =
"cassandra.ssl.truststore.path";
public static final String CASSANDRA_TRUSTSTORE_PASSWORD =
"cassandra.ssl.truststore.password";


public static final String DEFAULT_HOST = "localhost";
public static final String DEFAULT_PORT = "9042";
Expand Down Expand Up @@ -143,6 +155,7 @@ public class CassandraInterpreter extends Interpreter {
public static final List NO_COMPLETION = new ArrayList<>();

InterpreterLogic helper;
Cluster.Builder clusterBuilder;
Cluster cluster;
Session session;
private JavaDriverConfig driverConfig = new JavaDriverConfig();
Expand All @@ -162,29 +175,58 @@ public void open() {
}

LOGGER.info("Bootstrapping Cassandra Java Driver to connect to " + hosts.toString() +
"on port " + port);
"on port " + port);

Compression compression = driverConfig.getCompressionProtocol(this);

cluster = Cluster.builder()
.addContactPoints(addresses)
.withPort(port)
.withProtocolVersion(driverConfig.getProtocolVersion(this))
.withClusterName(getProperty(CASSANDRA_CLUSTER_NAME))
.withCompression(compression)
.withCredentials(getProperty(CASSANDRA_CREDENTIALS_USERNAME),
getProperty(CASSANDRA_CREDENTIALS_PASSWORD))
.withLoadBalancingPolicy(driverConfig.getLoadBalancingPolicy(this))
.withRetryPolicy(driverConfig.getRetryPolicy(this))
.withReconnectionPolicy(driverConfig.getReconnectionPolicy(this))
.withSpeculativeExecutionPolicy(driverConfig.getSpeculativeExecutionPolicy(this))
.withMaxSchemaAgreementWaitSeconds(
parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS)))
.withPoolingOptions(driverConfig.getPoolingOptions(this))
.withQueryOptions(driverConfig.getQueryOptions(this))
.withSocketOptions(driverConfig.getSocketOptions(this))
.build();
clusterBuilder = Cluster.builder()
.addContactPoints(addresses)
.withPort(port)
.withProtocolVersion(driverConfig.getProtocolVersion(this))
.withClusterName(getProperty(CASSANDRA_CLUSTER_NAME))
.withCompression(compression)
.withCredentials(getProperty(CASSANDRA_CREDENTIALS_USERNAME),
getProperty(CASSANDRA_CREDENTIALS_PASSWORD))
.withLoadBalancingPolicy(driverConfig.getLoadBalancingPolicy(this))
.withRetryPolicy(driverConfig.getRetryPolicy(this))
.withReconnectionPolicy(driverConfig.getReconnectionPolicy(this))
.withSpeculativeExecutionPolicy(driverConfig.getSpeculativeExecutionPolicy(this))
.withMaxSchemaAgreementWaitSeconds(
parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS)))
.withPoolingOptions(driverConfig.getPoolingOptions(this))
.withQueryOptions(driverConfig.getQueryOptions(this))
.withSocketOptions(driverConfig.getSocketOptions(this));

final String runWithSSL = getProperty(CASSANDRA_WITH_SSL);
if (runWithSSL != null && runWithSSL.equals("true")) {
LOGGER.debug("Cassandra Interpreter: Using SSL");

try {
final SSLContext sslContext;
{
final KeyStore trustStore = KeyStore.getInstance("JKS");
final InputStream stream = Files.newInputStream(Paths.get(
getProperty(CASSANDRA_TRUSTSTORE_PATH)));
trustStore.load(stream, getProperty(CASSANDRA_TRUSTSTORE_PASSWORD).toCharArray());

final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);

sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
}
clusterBuilder = clusterBuilder.withSSL(JdkSSLOptions.builder()
.withSSLContext(sslContext)
.build());
} catch (Exception e) {
LOGGER.error(e.toString());
}
} else {
LOGGER.debug("Cassandra Interpreter: Not using SSL");
}

cluster = clusterBuilder.build();
session = cluster.connect();
helper = new InterpreterLogic(session);
}
Expand Down
18 changes: 18 additions & 0 deletions cassandra/src/main/resources/interpreter-setting.json
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,24 @@
"propertyName": "cassandra.socket.tcp.no_delay",
"defaultValue": "true",
"description": "Cassandra socket TCP no delay. Default = true"
},
"cassandra.ssl.enabled": {
"envName": null,
"propertyName": "cassandra.ssl.enabled",
"defaultValue": "false",
"description": "Cassandra SSL"
},
"cassandra.ssl.truststore.path": {
"envName": null,
"propertyName": "cassandra.ssl.truststore.path",
"defaultValue": "none",
"description": "Cassandra truststore path. Default = none"
},
"cassandra.ssl.truststore.password": {
"envName": null,
"propertyName": "cassandra.ssl.truststore.password",
"defaultValue": "none",
"description": "Cassandra truststore password. Default = none"
}
},
"editor": {
Expand Down