diff --git a/presto-cassandra/pom.xml b/presto-cassandra/pom.xml
index 208d251d0adab..17e089c46afa6 100644
--- a/presto-cassandra/pom.xml
+++ b/presto-cassandra/pom.xml
@@ -166,6 +166,11 @@
testing
test
+
+
+ com.facebook.airlift
+ security
+
diff --git a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientConfig.java b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientConfig.java
index 421ebe65f8ada..203e8e3b84f35 100644
--- a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientConfig.java
+++ b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientConfig.java
@@ -30,6 +30,7 @@
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
+import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@@ -70,6 +71,11 @@ public class CassandraClientConfig
private int speculativeExecutionLimit = 1;
private Duration speculativeExecutionDelay = new Duration(500, MILLISECONDS);
private ProtocolVersion protocolVersion = ProtocolVersion.V3;
+ private boolean tlsEnabled;
+ private File truststorePath;
+ private String truststorePassword;
+ private File keystorePath;
+ private String keystorePassword;
@NotNull
@Size(min = 1)
@@ -407,4 +413,66 @@ public CassandraClientConfig setProtocolVersion(ProtocolVersion version)
this.protocolVersion = version;
return this;
}
+
+ public boolean isTlsEnabled()
+ {
+ return tlsEnabled;
+ }
+
+ @Config("cassandra.tls.enabled")
+ public CassandraClientConfig setTlsEnabled(boolean tlsEnabled)
+ {
+ this.tlsEnabled = tlsEnabled;
+ return this;
+ }
+
+ public Optional getKeystorePath()
+ {
+ return Optional.ofNullable(keystorePath);
+ }
+
+ @Config("cassandra.tls.keystore-path")
+ public CassandraClientConfig setKeystorePath(File keystorePath)
+ {
+ this.keystorePath = keystorePath;
+ return this;
+ }
+
+ public Optional getKeystorePassword()
+ {
+ return Optional.ofNullable(keystorePassword);
+ }
+
+ @Config("cassandra.tls.keystore-password")
+ @ConfigSecuritySensitive
+ public CassandraClientConfig setKeystorePassword(String keystorePassword)
+ {
+ this.keystorePassword = keystorePassword;
+ return this;
+ }
+
+ public Optional getTruststorePath()
+ {
+ return Optional.ofNullable(truststorePath);
+ }
+
+ @Config("cassandra.tls.truststore-path")
+ public CassandraClientConfig setTruststorePath(File truststorePath)
+ {
+ this.truststorePath = truststorePath;
+ return this;
+ }
+
+ public Optional getTruststorePassword()
+ {
+ return Optional.ofNullable(truststorePassword);
+ }
+
+ @Config("cassandra.tls.truststore-password")
+ @ConfigSecuritySensitive
+ public CassandraClientConfig setTruststorePassword(String truststorePassword)
+ {
+ this.truststorePassword = truststorePassword;
+ return this;
+ }
}
diff --git a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientModule.java b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientModule.java
index d2990a32bb800..364779296f020 100644
--- a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientModule.java
+++ b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientModule.java
@@ -14,6 +14,7 @@
package com.facebook.presto.cassandra;
import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.ConstantSpeculativeExecutionPolicy;
@@ -24,6 +25,7 @@
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.policies.WhiteListPolicy;
import com.facebook.airlift.json.JsonCodec;
+import com.facebook.presto.cassandra.util.SslContextProvider;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
@@ -124,6 +126,12 @@ public static CassandraSession createCassandraSession(
if (config.getClientSoLinger() != null) {
socketOptions.setSoLinger(config.getClientSoLinger());
}
+ if (config.isTlsEnabled()) {
+ SslContextProvider sslContextProvider = new SslContextProvider(config.getKeystorePath(),
+ config.getKeystorePassword(), config.getTruststorePath(), config.getTruststorePassword());
+ sslContextProvider.buildSslContext().ifPresent(context ->
+ clusterBuilder.withSSL(JdkSSLOptions.builder().withSSLContext(context).build()));
+ }
clusterBuilder.withSocketOptions(socketOptions);
if (config.getUsername() != null && config.getPassword() != null) {
diff --git a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraErrorCode.java b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraErrorCode.java
index 70d415ebebba8..80fa5112e6a7f 100644
--- a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraErrorCode.java
+++ b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraErrorCode.java
@@ -22,7 +22,9 @@
public enum CassandraErrorCode
implements ErrorCodeSupplier
{
- CASSANDRA_METADATA_ERROR(0, EXTERNAL), CASSANDRA_VERSION_ERROR(1, EXTERNAL);
+ CASSANDRA_METADATA_ERROR(0, EXTERNAL),
+ CASSANDRA_VERSION_ERROR(1, EXTERNAL),
+ CASSANDRA_SSL_INITIALIZATION_FAILURE(2, EXTERNAL);
private final ErrorCode errorCode;
diff --git a/presto-cassandra/src/main/java/com/facebook/presto/cassandra/util/SslContextProvider.java b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/util/SslContextProvider.java
new file mode 100644
index 0000000000000..285029a5d6158
--- /dev/null
+++ b/presto-cassandra/src/main/java/com/facebook/presto/cassandra/util/SslContextProvider.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.cassandra.util;
+
+import com.facebook.presto.spi.PrestoException;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import javax.security.auth.x500.X500Principal;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateExpiredException;
+import java.security.cert.CertificateNotYetValidException;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static com.facebook.airlift.security.pem.PemReader.loadKeyStore;
+import static com.facebook.airlift.security.pem.PemReader.readCertificateChain;
+import static com.facebook.presto.cassandra.CassandraErrorCode.CASSANDRA_SSL_INITIALIZATION_FAILURE;
+import static java.util.Collections.list;
+
+public class SslContextProvider
+{
+ private final Optional keystorePath;
+ private final Optional keystorePassword;
+ private final Optional truststorePath;
+ private final Optional truststorePassword;
+
+ public SslContextProvider(
+ Optional keystorePath,
+ Optional keystorePassword,
+ Optional truststorePath,
+ Optional truststorePassword)
+ {
+ this.keystorePath = keystorePath;
+ this.keystorePassword = keystorePassword;
+ this.truststorePath = truststorePath;
+ this.truststorePassword = truststorePassword;
+ }
+
+ public Optional buildSslContext()
+ {
+ if (!keystorePath.isPresent() && !truststorePath.isPresent()) {
+ return Optional.empty();
+ }
+ try {
+ // load KeyStore if configured and get KeyManagers
+ KeyStore keystore = null;
+ KeyManager[] keyManagers = null;
+ if (keystorePath.isPresent()) {
+ char[] keyManagerPassword;
+ try {
+ // attempt to read the key store as a PEM file
+ keystore = loadKeyStore(keystorePath.get(), keystorePath.get(), keystorePassword);
+ // for PEM encoded keys, the password is used to decrypt the specific key (and does not
+ // protect the keystore itself)
+ keyManagerPassword = new char[0];
+ }
+ catch (IOException | GeneralSecurityException ignored) {
+ keyManagerPassword = keystorePassword.map(String::toCharArray).orElse(null);
+ keystore = KeyStore.getInstance(KeyStore.getDefaultType());
+ try (InputStream in = new FileInputStream(keystorePath.get())) {
+ keystore.load(in, keyManagerPassword);
+ }
+ }
+ validateCertificates(keystore);
+ KeyManagerFactory keyManagerFactory =
+ KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ keyManagerFactory.init(keystore, keyManagerPassword);
+ keyManagers = keyManagerFactory.getKeyManagers();
+ }
+ // load TrustStore if configured, otherwise use KeyStore
+ KeyStore truststore = keystore;
+ if (truststorePath.isPresent()) {
+ truststore = loadTrustStore(truststorePath.get(), truststorePassword);
+ }
+
+ // create TrustManagerFactory
+ TrustManagerFactory trustManagerFactory =
+ TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ trustManagerFactory.init(truststore);
+
+ // get X509TrustManager
+ TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
+ if ((trustManagers.length != 1) || !(trustManagers[0] instanceof X509TrustManager)) {
+ throw new RuntimeException("Unexpected default trust managers:" + Arrays.toString(trustManagers));
+ }
+
+ X509TrustManager trustManager = (X509TrustManager) trustManagers[0];
+ // create SSLContext
+ SSLContext result = SSLContext.getInstance("SSL");
+ result.init(keyManagers, new TrustManager[] {trustManager}, null);
+ return Optional.of(result);
+ }
+ catch (GeneralSecurityException | IOException e) {
+ throw new PrestoException(CASSANDRA_SSL_INITIALIZATION_FAILURE, e);
+ }
+ }
+
+ public KeyStore loadTrustStore(File trustStorePath, Optional trustStorePassword)
+ throws IOException, GeneralSecurityException
+ {
+ KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ try {
+ // attempt to read the trust store as a PEM file
+ List certificateChain = readCertificateChain(trustStorePath);
+ if (!certificateChain.isEmpty()) {
+ trustStore.load(null, null);
+ for (X509Certificate certificate : certificateChain) {
+ X500Principal principal = certificate.getSubjectX500Principal();
+ trustStore.setCertificateEntry(principal.getName(), certificate);
+ }
+ return trustStore;
+ }
+ }
+ catch (IOException | GeneralSecurityException ignored) {
+ }
+ try (InputStream inputStream = new FileInputStream(trustStorePath)) {
+ trustStore.load(inputStream, trustStorePassword.map(String::toCharArray).orElse(null));
+ }
+ return trustStore;
+ }
+
+ public void validateCertificates(KeyStore keyStore) throws GeneralSecurityException
+ {
+ for (String alias : list(keyStore.aliases())) {
+ if (!keyStore.isKeyEntry(alias)) {
+ continue;
+ }
+ Certificate certificate = keyStore.getCertificate(alias);
+ if (!(certificate instanceof X509Certificate)) {
+ continue;
+ }
+ try {
+ ((X509Certificate) certificate).checkValidity();
+ }
+ catch (CertificateExpiredException e) {
+ throw new CertificateExpiredException("KeyStore certificate is expired: " + e.getMessage());
+ }
+ catch (CertificateNotYetValidException e) {
+ throw new CertificateNotYetValidException("KeyStore certificate is not yet valid: " + e.getMessage());
+ }
+ }
+ }
+}
diff --git a/presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestCassandraClientConfig.java b/presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestCassandraClientConfig.java
index 331aba77a2504..7b6ec6e476abb 100644
--- a/presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestCassandraClientConfig.java
+++ b/presto-cassandra/src/test/java/com/facebook/presto/cassandra/TestCassandraClientConfig.java
@@ -20,6 +20,7 @@
import io.airlift.units.Duration;
import org.testng.annotations.Test;
+import java.io.File;
import java.util.Map;
import static com.datastax.driver.core.ProtocolVersion.V2;
@@ -59,7 +60,12 @@ public void testDefaults()
.setNoHostAvailableRetryTimeout(new Duration(1, MINUTES))
.setSpeculativeExecutionLimit(1)
.setSpeculativeExecutionDelay(new Duration(500, MILLISECONDS))
- .setProtocolVersion(V3));
+ .setProtocolVersion(V3)
+ .setKeystorePath(null)
+ .setKeystorePassword(null)
+ .setTruststorePath(null)
+ .setTruststorePassword(null)
+ .setTlsEnabled(false));
}
@Test
@@ -92,6 +98,11 @@ public void testExplicitPropertyMappings()
.put("cassandra.speculative-execution.limit", "10")
.put("cassandra.speculative-execution.delay", "101s")
.put("cassandra.protocol-version", "V2")
+ .put("cassandra.tls.enabled", "true")
+ .put("cassandra.tls.keystore-path", "/tmp/keystore")
+ .put("cassandra.tls.keystore-password", "keystore-password")
+ .put("cassandra.tls.truststore-path", "/tmp/truststore")
+ .put("cassandra.tls.truststore-password", "truststore-password")
.build();
CassandraClientConfig expected = new CassandraClientConfig()
@@ -120,7 +131,12 @@ public void testExplicitPropertyMappings()
.setNoHostAvailableRetryTimeout(new Duration(3, MINUTES))
.setSpeculativeExecutionLimit(10)
.setSpeculativeExecutionDelay(new Duration(101, SECONDS))
- .setProtocolVersion(V2);
+ .setProtocolVersion(V2)
+ .setTlsEnabled(true)
+ .setKeystorePath(new File("/tmp/keystore"))
+ .setKeystorePassword("keystore-password")
+ .setTruststorePath(new File("/tmp/truststore"))
+ .setTruststorePassword("truststore-password");
ConfigAssertions.assertFullMapping(properties, expected);
}
diff --git a/presto-docs/src/main/sphinx/connector/cassandra.rst b/presto-docs/src/main/sphinx/connector/cassandra.rst
index 1700be1f1a761..153aaad18cba9 100644
--- a/presto-docs/src/main/sphinx/connector/cassandra.rst
+++ b/presto-docs/src/main/sphinx/connector/cassandra.rst
@@ -144,6 +144,16 @@ Property Name Description
``cassandra.speculative-execution.limit`` The number of speculative executions (defaults to ``1``).
``cassandra.speculative-execution.delay`` The delay between each speculative execution (defaults to ``500ms``).
+
+``cassandra.tls.enabled`` Whether TLS security is enabled (defaults to ``false``).
+
+``cassandra.tls.keystore-path`` Path to the PEM or JKS key store.
+
+``cassandra.tls.truststore-path`` Path to the PEM or JKS trust store.
+
+``cassandra.tls.keystore-password`` Password for the key store.
+
+``cassandra.tls.truststore-password`` Password for the trust store.
============================================================= ======================================================================
Querying Cassandra Tables