Skip to content

Commit

Permalink
Add support of TLS auth to NATS JetStream sink
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathijs van den Worm authored and jpechane committed Jun 6, 2024
1 parent 789e112 commit 8bb3f93
Showing 1 changed file with 45 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
*/
package io.debezium.server.nats.jetstream;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.List;
import java.util.Optional;

Expand All @@ -15,6 +19,10 @@
import jakarta.inject.Inject;
import jakarta.inject.Named;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
Expand Down Expand Up @@ -57,6 +65,9 @@ public class NatsJetStreamChangeConsumer extends BaseChangeConsumer
private static final String PROP_AUTH_SEED = PROP_PREFIX + "auth.seed";
private static final String PROP_AUTH_USER = PROP_PREFIX + "auth.user";
private static final String PROP_AUTH_PASSWORD = PROP_PREFIX + "auth.password";
private static final String PROP_AUTH_TLS_KEYSTORE = PROP_PREFIX + "auth.tls.keystore";
private static final String PROP_AUTH_TLS_KEYSTORE_PASSWORD = PROP_PREFIX + "auth.tls.keystore.password";
private static final String PROP_AUTH_TLS_PASSWORD = PROP_PREFIX + "auth.tls.password";

private Connection nc;
private JetStream js;
Expand All @@ -76,6 +87,15 @@ public class NatsJetStreamChangeConsumer extends BaseChangeConsumer
@ConfigProperty(name = PROP_AUTH_PASSWORD)
Optional<String> password;

@ConfigProperty(name = PROP_AUTH_TLS_KEYSTORE)
Optional<String> tlsKeyStore;

@ConfigProperty(name = PROP_AUTH_TLS_KEYSTORE_PASSWORD)
Optional<String> tlsKeyStorePassword;

@ConfigProperty(name = PROP_AUTH_TLS_PASSWORD)
Optional<String> tlsPassword;

@Inject
@CustomConsumerBuilder
Instance<JetStream> customStreamingConnection;
Expand Down Expand Up @@ -105,6 +125,10 @@ void connect() {
else if (user.isPresent() && password.isPresent()) {
natsOptionsBuilder.userInfo(user.get(), password.get());
}
else if (tlsKeyStore.isPresent() && tlsKeyStorePassword.isPresent() && tlsPassword.isPresent()) {
var ctx = sslAuthContext(tlsKeyStore.get(), tlsKeyStorePassword.get(), tlsPassword.get());
natsOptionsBuilder.sslContext(ctx);
}

nc = Nats.connect(natsOptionsBuilder.build());

Expand Down Expand Up @@ -170,4 +194,25 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
}
committer.markBatchFinished();
}

private static SSLContext sslAuthContext(String keystorePath, String keystorePassword,
String password)
throws Exception {

var keystore = KeyStore.getInstance(KeyStore.getDefaultType());
try (var in = new BufferedInputStream(new FileInputStream(keystorePath))) {
keystore.load(in, keystorePassword.toCharArray());
}

var kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keystore, password.toCharArray());

var tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keystore);

var ctx = SSLContext.getInstance(Options.DEFAULT_SSL_PROTOCOL);
ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());

return ctx;
}
}

0 comments on commit 8bb3f93

Please sign in to comment.