Skip to content

Commit

Permalink
Add 'ssl.ca.pem' property (#2380)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Sep 17, 2021
1 parent 9ded5ee commit e57d4d4
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 36 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# librdkafka v1.9.0

## Enhancements

* Added `ssl.ca.pem` to add CA certificate by PEM string. (#2380)


# librdkafka v1.8.0

librdkafka v1.8.0 is a security release:
Expand Down
1 change: 1 addition & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ ssl.certificate.location | * | |
ssl.certificate.pem | * | | | low | Client's public key string (PEM format) used for authentication. <br>*Type: string*
ssl_certificate | * | | | low | Client's public key as set by rd_kafka_conf_set_ssl_cert() <br>*Type: see dedicated API*
ssl.ca.location | * | | | low | File or directory path to CA certificate(s) for verifying the broker's key. Defaults: On Windows the system's CA certificates are automatically looked up in the Windows Root certificate store. On Mac OSX this configuration defaults to `probe`. It is recommended to install openssl using Homebrew, to provide CA certificates. On Linux install the distribution's ca-certificates package. If OpenSSL is statically linked or `ssl.ca.location` is set to `probe` a list of standard paths will be probed and the first one found will be used as the default CA certificate location path. If OpenSSL is dynamically linked the OpenSSL library's default path will be used (see `OPENSSLDIR` in `openssl version -a`). <br>*Type: string*
ssl.ca.pem | * | | | low | CA certificate string (PEM format) for verifying the broker's key. <br>*Type: string*
ssl_ca | * | | | low | CA certificate as set by rd_kafka_conf_set_ssl_cert() <br>*Type: see dedicated API*
ssl.ca.certificate.stores | * | | Root | low | Comma-separated list of Windows Certificate stores to load CA certificates from. Certificates will be loaded in the same order as stores are specified. If no certificates can be loaded from any of the specified stores an error is logged and the OpenSSL library's default CA location is used instead. Store names are typically one or more of: MY, Root, Trust, CA. <br>*Type: string*
ssl.crl.location | * | | | low | Path to CRL for verifying broker's certificate validity. <br>*Type: string*
Expand Down
11 changes: 7 additions & 4 deletions src-cpp/rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -1305,6 +1305,9 @@ class RD_EXPORT Conf {
*
* @remark Private and public keys in PEM format may also be set with the
* `ssl.key.pem` and `ssl.certificate.pem` configuration properties.
*
* @remark CA certificate in PEM format may also be set with the
* `ssl.ca.pem` configuration property.
*/
virtual Conf::ConfResult set_ssl_cert (RdKafka::CertificateType cert_type,
RdKafka::CertificateEncoding cert_enc,
Expand Down Expand Up @@ -1419,14 +1422,14 @@ class RD_EXPORT Conf {
*/
virtual struct rd_kafka_topic_conf_s *c_ptr_topic () = 0;

/**
/**
* @brief Set callback_data for ssl engine.
*
* @remark The \c ssl.engine.location configuration must be set for this
* @remark The \c ssl.engine.location configuration must be set for this
* to have affect.
*
* @remark The memory pointed to by \p value must remain valid for the
* lifetime of the configuration object and any Kafka clients that
* @remark The memory pointed to by \p value must remain valid for the
* lifetime of the configuration object and any Kafka clients that
* use it.
*
* @returns CONF_OK on success, else CONF_INVALID.
Expand Down
13 changes: 8 additions & 5 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -2302,6 +2302,9 @@ typedef enum rd_kafka_cert_enc_t {
*
* @remark Private and public keys in PEM format may also be set with the
* `ssl.key.pem` and `ssl.certificate.pem` configuration properties.
*
* @remark CA certificate in PEM format may also be set with the
* `ssl.ca.pem` configuration property.
*/
RD_EXPORT rd_kafka_conf_res_t
rd_kafka_conf_set_ssl_cert (rd_kafka_conf_t *conf,
Expand All @@ -2315,18 +2318,18 @@ rd_kafka_conf_set_ssl_cert (rd_kafka_conf_t *conf,
* @brief Set callback_data for OpenSSL engine.
*
* @param conf Configuration object.
* @param callback_data passed to engine callbacks,
* @param callback_data passed to engine callbacks,
* e.g. \c ENGINE_load_ssl_client_cert.
*
* @remark The \c ssl.engine.location configuration must be set for this
* @remark The \c ssl.engine.location configuration must be set for this
* to have affect.
*
* @remark The memory pointed to by \p value must remain valid for the
* lifetime of the configuration object and any Kafka clients that
* @remark The memory pointed to by \p value must remain valid for the
* lifetime of the configuration object and any Kafka clients that
* use it.
*/
RD_EXPORT
void rd_kafka_conf_set_engine_callback_data (rd_kafka_conf_t *conf,
void rd_kafka_conf_set_engine_callback_data (rd_kafka_conf_t *conf,
void *callback_data);


Expand Down
9 changes: 7 additions & 2 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,11 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"path will be used (see `OPENSSLDIR` in `openssl version -a`).",
_UNSUPPORTED_SSL
},
{ _RK_GLOBAL|_RK_SENSITIVE, "ssl.ca.pem", _RK_C_STR,
_RK(ssl.ca_pem),
"CA certificate string (PEM format) for verifying the broker's key.",
_UNSUPPORTED_SSL
},
{ _RK_GLOBAL, "ssl_ca", _RK_C_INTERNAL,
_RK(ssl.ca),
"CA certificate as set by rd_kafka_conf_set_ssl_cert()",
Expand Down Expand Up @@ -3703,8 +3708,8 @@ const char *rd_kafka_conf_finalize (rd_kafka_type_t cltype,
if (conf->ssl.keystore_location && !conf->ssl.keystore_password)
return "`ssl.keystore.password` is mandatory when "
"`ssl.keystore.location` is set";
if (conf->ssl.ca && conf->ssl.ca_location)
return "`ssl.ca.location`, and memory-based "
if (conf->ssl.ca && (conf->ssl.ca_location || conf->ssl.ca_pem))
return "`ssl.ca.location` or `ssl.ca.pem`, and memory-based "
"set_ssl_cert(CERT_CA) are mutually exclusive.";
#ifdef __APPLE__
else /* Default ssl.ca.location to 'probe' on OSX */
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ typedef enum {

/* Increase in steps of 64 as needed.
* This must be larger than sizeof(rd_kafka_[topic_]conf_t) */
#define RD_KAFKA_CONF_PROPS_IDX_MAX (64*28)
#define RD_KAFKA_CONF_PROPS_IDX_MAX (64*29)

/**
* @struct rd_kafka_anyconf_t
Expand Down Expand Up @@ -238,6 +238,7 @@ struct rd_kafka_conf_s {
char *cert_pem;
rd_kafka_cert_t *cert;
char *ca_location;
char *ca_pem;
rd_kafka_cert_t *ca;
/** CSV list of Windows certificate stores */
char *ca_cert_stores;
Expand Down
84 changes: 64 additions & 20 deletions src/rdkafka_ssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ static int rd_kafka_ssl_probe_and_set_default_ca_location (rd_kafka_t *rk,
*/
static int rd_kafka_ssl_set_certs (rd_kafka_t *rk, SSL_CTX *ctx,
char *errstr, size_t errstr_size) {
rd_bool_t ca_probe = rd_true;
rd_bool_t check_pkey = rd_false;
int r;

Expand All @@ -972,31 +973,74 @@ static int rd_kafka_ssl_set_certs (rd_kafka_t *rk, SSL_CTX *ctx,
/* OpenSSL takes ownership of the store */
rk->rk_conf.ssl.ca->store = NULL;

} else if (rk->rk_conf.ssl.ca_location &&
strcmp(rk->rk_conf.ssl.ca_location, "probe")) {
/* CA certificate location, either file or directory. */
int is_dir = rd_kafka_path_is_dir(rk->rk_conf.ssl.ca_location);
ca_probe = rd_false;

rd_kafka_dbg(rk, SECURITY, "SSL",
"Loading CA certificate(s) from %s %s",
is_dir ? "directory" : "file",
rk->rk_conf.ssl.ca_location);
} else {

r = SSL_CTX_load_verify_locations(ctx,
!is_dir ?
rk->rk_conf.ssl.
ca_location : NULL,
is_dir ?
rk->rk_conf.ssl.
ca_location : NULL);
if (rk->rk_conf.ssl.ca_location &&
strcmp(rk->rk_conf.ssl.ca_location, "probe")) {
/* CA certificate location, either file or directory. */
int is_dir = rd_kafka_path_is_dir(
rk->rk_conf.ssl.ca_location);

rd_kafka_dbg(rk, SECURITY, "SSL",
"Loading CA certificate(s) from %s %s",
is_dir ? "directory" : "file",
rk->rk_conf.ssl.ca_location);

r = SSL_CTX_load_verify_locations(ctx,
!is_dir ?
rk->rk_conf.ssl.
ca_location : NULL,
is_dir ?
rk->rk_conf.ssl.
ca_location : NULL);

if (r != 1) {
rd_snprintf(errstr, errstr_size,
"ssl.ca.location failed: ");
return -1;
if (r != 1) {
rd_snprintf(errstr, errstr_size,
"ssl.ca.location failed: ");
return -1;
}

ca_probe = rd_false;
}

} else {
if (rk->rk_conf.ssl.ca_pem) {
/* CA as PEM string */
X509 *x509;
X509_STORE *store;

/* Get the OpenSSL trust store */
store = SSL_CTX_get_cert_store(ctx);
rd_assert(store != NULL);

rd_kafka_dbg(rk, SECURITY, "SSL",
"Loading CA certificate from string");

x509 = rd_kafka_ssl_X509_from_string(
rk, rk->rk_conf.ssl.ca_pem);
if (!x509) {
rd_snprintf(errstr, errstr_size,
"ssl.ca.pem failed: "
"not in PEM format?: ");
return -1;
}

if (!X509_STORE_add_cert(store, x509)) {
rd_snprintf(errstr, errstr_size,
"failed to add ssl.ca.pem to "
"CA cert store: ");
X509_free(x509);
return -1;
}

X509_free(x509);

ca_probe = rd_false;
}
}

if (ca_probe) {
#ifdef _WIN32
/* Attempt to load CA root certificates from the
* configured Windows certificate stores. */
Expand Down
52 changes: 48 additions & 4 deletions tests/0097-ssl_verify.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ static void conf_location_to_pem (RdKafka::Conf *conf,

std::string errstr;
if (conf->set(loc_prop, "", errstr) != RdKafka::Conf::CONF_OK)
Test::Fail("Failed to reset " + loc_prop);
Test::Fail("Failed to reset " + loc_prop + ": " + errstr);

/* Read file */
std::ifstream ifs(loc.c_str());
Expand All @@ -143,7 +143,7 @@ static void conf_location_to_pem (RdKafka::Conf *conf,
" from disk and changed to in-memory " + pem_prop + "\n");

if (conf->set(pem_prop, pem, errstr) != RdKafka::Conf::CONF_OK)
Test::Fail("Failed to set " + pem_prop);
Test::Fail("Failed to set " + pem_prop + ": " + errstr);
}

/**
Expand Down Expand Up @@ -257,7 +257,9 @@ static void do_test_verify (const int line, bool verify_ok,
conf_location_to_setter(conf, "ssl.certificate.location",
RdKafka::CERT_PUBLIC_KEY, pub_enc);

if (load_ca == USE_SETTER)
if (load_ca == USE_CONF)
conf_location_to_pem(conf, "ssl.ca.location", "ssl.ca.pem");
else if (load_ca == USE_SETTER)
conf_location_to_setter(conf, "ssl.ca.location",
RdKafka::CERT_CA, ca_enc);

Expand Down Expand Up @@ -376,8 +378,8 @@ extern "C" {
return 0;
}

do_test_bad_calls();

do_test_bad_calls();

do_test_verify(__LINE__, true,
USE_LOCATION, RdKafka::CERT_ENC_PEM,
Expand All @@ -393,6 +395,10 @@ extern "C" {
USE_CONF, RdKafka::CERT_ENC_PEM,
USE_CONF, RdKafka::CERT_ENC_PEM,
USE_LOCATION, RdKafka::CERT_ENC_PEM);
do_test_verify(__LINE__, true,
USE_CONF, RdKafka::CERT_ENC_PEM,
USE_CONF, RdKafka::CERT_ENC_PEM,
USE_CONF, RdKafka::CERT_ENC_PEM);
do_test_verify(__LINE__, true,
USE_SETTER, RdKafka::CERT_ENC_PEM,
USE_SETTER, RdKafka::CERT_ENC_PEM,
Expand All @@ -408,4 +414,42 @@ extern "C" {

return 0;
}


int main_0097_ssl_verify_local (int argc, char **argv) {
if (!test_check_builtin("ssl")) {
Test::Skip("Test requires SSL support\n");
return 0;
}


/* Check that creating a client with an invalid PEM string fails. */
const std::string props[] = { "ssl.ca.pem", "ssl.key.pem",
"ssl.certificate.pem", "" };

for (int i = 0 ; props[i] != "" ; i++) {
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

std::string errstr;

if (conf->set("security.protocol", "SSL", errstr))
Test::Fail(errstr);
conf->set("debug", "security", errstr);
if (conf->set(props[i], "this is \n not a \t PEM!", errstr))
Test::Fail("Setting " + props[i] + " to junk should work, "
"expecting failure on client creation");

RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
delete conf;
if (producer)
Test::Fail("Expected producer creation to fail with " + props[i] +
" set to junk");
else
Test::Say("Failed to create producer with junk " + props[i] +
" (as expected): " + errstr + "\n");
}

return 0;
}

}
2 changes: 2 additions & 0 deletions tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ _TEST_DECL(0093_holb_consumer);
_TEST_DECL(0094_idempotence_msg_timeout);
_TEST_DECL(0095_all_brokers_down);
_TEST_DECL(0097_ssl_verify);
_TEST_DECL(0097_ssl_verify_local);
_TEST_DECL(0098_consumer_txn);
_TEST_DECL(0099_commit_metadata);
_TEST_DECL(0100_thread_interceptors);
Expand Down Expand Up @@ -409,6 +410,7 @@ struct test tests[] = {
#endif
_TEST(0095_all_brokers_down, TEST_F_LOCAL),
_TEST(0097_ssl_verify, 0),
_TEST(0097_ssl_verify_local, TEST_F_LOCAL),
_TEST(0098_consumer_txn, 0, TEST_BRKVER(0,11,0,0)),
_TEST(0099_commit_metadata, 0),
_TEST(0100_thread_interceptors, TEST_F_LOCAL),
Expand Down

0 comments on commit e57d4d4

Please sign in to comment.