Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions presto-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>security</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;

import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -70,6 +71,11 @@ public class CassandraClientConfig
private int speculativeExecutionLimit = 1;
private Duration speculativeExecutionDelay = new Duration(500, MILLISECONDS);
private ProtocolVersion protocolVersion = ProtocolVersion.V3;
private boolean tlsEnabled;
private File truststorePath;
private String truststorePassword;
private File keystorePath;
private String keystorePassword;

@NotNull
@Size(min = 1)
Expand Down Expand Up @@ -407,4 +413,66 @@ public CassandraClientConfig setProtocolVersion(ProtocolVersion version)
this.protocolVersion = version;
return this;
}

public boolean isTlsEnabled()
{
return tlsEnabled;
}

@Config("cassandra.tls.enabled")
public CassandraClientConfig setTlsEnabled(boolean tlsEnabled)
{
this.tlsEnabled = tlsEnabled;
return this;
}

public Optional<File> getKeystorePath()
{
return Optional.ofNullable(keystorePath);
}

@Config("cassandra.tls.keystore-path")
public CassandraClientConfig setKeystorePath(File keystorePath)
{
this.keystorePath = keystorePath;
return this;
}

public Optional<String> getKeystorePassword()
{
return Optional.ofNullable(keystorePassword);
}

@Config("cassandra.tls.keystore-password")
@ConfigSecuritySensitive
public CassandraClientConfig setKeystorePassword(String keystorePassword)
{
this.keystorePassword = keystorePassword;
return this;
}

public Optional<File> getTruststorePath()
{
return Optional.ofNullable(truststorePath);
}

@Config("cassandra.tls.truststore-path")
public CassandraClientConfig setTruststorePath(File truststorePath)
{
this.truststorePath = truststorePath;
return this;
}

public Optional<String> getTruststorePassword()
{
return Optional.ofNullable(truststorePassword);
}

@Config("cassandra.tls.truststore-password")
@ConfigSecuritySensitive
public CassandraClientConfig setTruststorePassword(String truststorePassword)
{
this.truststorePassword = truststorePassword;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.ConstantSpeculativeExecutionPolicy;
Expand All @@ -24,6 +25,7 @@
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.policies.WhiteListPolicy;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.cassandra.util.SslContextProvider;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
Expand Down Expand Up @@ -124,6 +126,12 @@ public static CassandraSession createCassandraSession(
if (config.getClientSoLinger() != null) {
socketOptions.setSoLinger(config.getClientSoLinger());
}
if (config.isTlsEnabled()) {
SslContextProvider sslContextProvider = new SslContextProvider(config.getKeystorePath(),
config.getKeystorePassword(), config.getTruststorePath(), config.getTruststorePassword());
sslContextProvider.buildSslContext().ifPresent(context ->
clusterBuilder.withSSL(JdkSSLOptions.builder().withSSLContext(context).build()));
}
clusterBuilder.withSocketOptions(socketOptions);

if (config.getUsername() != null && config.getPassword() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
public enum CassandraErrorCode
implements ErrorCodeSupplier
{
CASSANDRA_METADATA_ERROR(0, EXTERNAL), CASSANDRA_VERSION_ERROR(1, EXTERNAL);
CASSANDRA_METADATA_ERROR(0, EXTERNAL),
CASSANDRA_VERSION_ERROR(1, EXTERNAL),
CASSANDRA_SSL_INITIALIZATION_FAILURE(2, EXTERNAL);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.cassandra.util;

import com.facebook.presto.spi.PrestoException;

import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import javax.security.auth.x500.X500Principal;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateExpiredException;
import java.security.cert.CertificateNotYetValidException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import static com.facebook.airlift.security.pem.PemReader.loadKeyStore;
import static com.facebook.airlift.security.pem.PemReader.readCertificateChain;
import static com.facebook.presto.cassandra.CassandraErrorCode.CASSANDRA_SSL_INITIALIZATION_FAILURE;
import static java.util.Collections.list;

public class SslContextProvider
{
private final Optional<File> keystorePath;
private final Optional<String> keystorePassword;
private final Optional<File> truststorePath;
private final Optional<String> truststorePassword;

public SslContextProvider(
Optional<File> keystorePath,
Optional<String> keystorePassword,
Optional<File> truststorePath,
Optional<String> truststorePassword)
{
this.keystorePath = keystorePath;
this.keystorePassword = keystorePassword;
this.truststorePath = truststorePath;
this.truststorePassword = truststorePassword;
}

public Optional<SSLContext> buildSslContext()
{
if (!keystorePath.isPresent() && !truststorePath.isPresent()) {
return Optional.empty();
}
try {
// load KeyStore if configured and get KeyManagers
KeyStore keystore = null;
KeyManager[] keyManagers = null;
if (keystorePath.isPresent()) {
char[] keyManagerPassword;
try {
// attempt to read the key store as a PEM file
keystore = loadKeyStore(keystorePath.get(), keystorePath.get(), keystorePassword);
// for PEM encoded keys, the password is used to decrypt the specific key (and does not
// protect the keystore itself)
keyManagerPassword = new char[0];
}
catch (IOException | GeneralSecurityException ignored) {
keyManagerPassword = keystorePassword.map(String::toCharArray).orElse(null);
keystore = KeyStore.getInstance(KeyStore.getDefaultType());
try (InputStream in = new FileInputStream(keystorePath.get())) {
keystore.load(in, keyManagerPassword);
}
}
validateCertificates(keystore);
KeyManagerFactory keyManagerFactory =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keystore, keyManagerPassword);
keyManagers = keyManagerFactory.getKeyManagers();
}
// load TrustStore if configured, otherwise use KeyStore
KeyStore truststore = keystore;
if (truststorePath.isPresent()) {
truststore = loadTrustStore(truststorePath.get(), truststorePassword);
}

// create TrustManagerFactory
TrustManagerFactory trustManagerFactory =
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(truststore);

// get X509TrustManager
TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
if ((trustManagers.length != 1) || !(trustManagers[0] instanceof X509TrustManager)) {
throw new RuntimeException("Unexpected default trust managers:" + Arrays.toString(trustManagers));
}

X509TrustManager trustManager = (X509TrustManager) trustManagers[0];
// create SSLContext
SSLContext result = SSLContext.getInstance("SSL");
result.init(keyManagers, new TrustManager[] {trustManager}, null);
return Optional.of(result);
}
catch (GeneralSecurityException | IOException e) {
throw new PrestoException(CASSANDRA_SSL_INITIALIZATION_FAILURE, e);
}
}

public KeyStore loadTrustStore(File trustStorePath, Optional<String> trustStorePassword)
throws IOException, GeneralSecurityException
{
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
try {
// attempt to read the trust store as a PEM file
List<X509Certificate> certificateChain = readCertificateChain(trustStorePath);
if (!certificateChain.isEmpty()) {
trustStore.load(null, null);
for (X509Certificate certificate : certificateChain) {
X500Principal principal = certificate.getSubjectX500Principal();
trustStore.setCertificateEntry(principal.getName(), certificate);
}
return trustStore;
}
}
catch (IOException | GeneralSecurityException ignored) {
}
try (InputStream inputStream = new FileInputStream(trustStorePath)) {
trustStore.load(inputStream, trustStorePassword.map(String::toCharArray).orElse(null));
}
return trustStore;
}

public void validateCertificates(KeyStore keyStore) throws GeneralSecurityException
{
for (String alias : list(keyStore.aliases())) {
if (!keyStore.isKeyEntry(alias)) {
continue;
}
Certificate certificate = keyStore.getCertificate(alias);
if (!(certificate instanceof X509Certificate)) {
continue;
}
try {
((X509Certificate) certificate).checkValidity();
}
catch (CertificateExpiredException e) {
throw new CertificateExpiredException("KeyStore certificate is expired: " + e.getMessage());
}
catch (CertificateNotYetValidException e) {
throw new CertificateNotYetValidException("KeyStore certificate is not yet valid: " + e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airlift.units.Duration;
import org.testng.annotations.Test;

import java.io.File;
import java.util.Map;

import static com.datastax.driver.core.ProtocolVersion.V2;
Expand Down Expand Up @@ -59,7 +60,12 @@ public void testDefaults()
.setNoHostAvailableRetryTimeout(new Duration(1, MINUTES))
.setSpeculativeExecutionLimit(1)
.setSpeculativeExecutionDelay(new Duration(500, MILLISECONDS))
.setProtocolVersion(V3));
.setProtocolVersion(V3)
.setKeystorePath(null)
.setKeystorePassword(null)
.setTruststorePath(null)
.setTruststorePassword(null)
.setTlsEnabled(false));
}

@Test
Expand Down Expand Up @@ -92,6 +98,11 @@ public void testExplicitPropertyMappings()
.put("cassandra.speculative-execution.limit", "10")
.put("cassandra.speculative-execution.delay", "101s")
.put("cassandra.protocol-version", "V2")
.put("cassandra.tls.enabled", "true")
.put("cassandra.tls.keystore-path", "/tmp/keystore")
.put("cassandra.tls.keystore-password", "keystore-password")
.put("cassandra.tls.truststore-path", "/tmp/truststore")
.put("cassandra.tls.truststore-password", "truststore-password")
.build();

CassandraClientConfig expected = new CassandraClientConfig()
Expand Down Expand Up @@ -120,7 +131,12 @@ public void testExplicitPropertyMappings()
.setNoHostAvailableRetryTimeout(new Duration(3, MINUTES))
.setSpeculativeExecutionLimit(10)
.setSpeculativeExecutionDelay(new Duration(101, SECONDS))
.setProtocolVersion(V2);
.setProtocolVersion(V2)
.setTlsEnabled(true)
.setKeystorePath(new File("/tmp/keystore"))
.setKeystorePassword("keystore-password")
.setTruststorePath(new File("/tmp/truststore"))
.setTruststorePassword("truststore-password");

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
10 changes: 10 additions & 0 deletions presto-docs/src/main/sphinx/connector/cassandra.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ Property Name Description
``cassandra.speculative-execution.limit`` The number of speculative executions (defaults to ``1``).

``cassandra.speculative-execution.delay`` The delay between each speculative execution (defaults to ``500ms``).

``cassandra.tls.enabled`` Whether TLS security is enabled (defaults to ``false``).

``cassandra.tls.keystore-path`` Path to the PEM or JKS key store.

``cassandra.tls.truststore-path`` Path to the PEM or JKS trust store.

``cassandra.tls.keystore-password`` Password for the key store.

``cassandra.tls.truststore-password`` Password for the trust store.
============================================================= ======================================================================

Querying Cassandra Tables
Expand Down