Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieve jwt token from token provider #3560

Merged
merged 11 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ librdkafka also provides a native C++ interface.
- [Threads and callbacks](#threads-and-callbacks)
- [Brokers](#brokers)
- [SSL](#ssl)
- [OAUTHBEARER with Support for OIDC](#oauthbearer-with-support-for-oidc)
- [Sparse connections](#sparse-connections)
- [Random broker selection](#random-broker-selection)
- [Persistent broker connections](#persistent-broker-connections)
Expand Down Expand Up @@ -1121,6 +1122,31 @@ For example, to read both intermediate and root CAs, set
`ssl.ca.certificate.stores=CA,Root`.


#### OAUTHBEARER with Support for OIDC

Oauthbearer with OIDC is another way for the client to connect to a broker's
SASL endpoints/listeners. To use this method the client needs to be
configured with `security.protocol=SASL_SSL` for SASL authentication
and SSL transport, and `sasl.oauthbearer.method=OIDC` to use
OIDC with OAUTHBEARER.

OAUTHBEARER with OIDC will also require configuration of the
following configuration properties:

* `sasl.oauthbearer.token.endpoint.url` - OAUTH issuer token endpoint HTTP(S)
URI used to retrieve the token.
* `sasl.oauthbearer.client.id` - A public identifier for the application.
It must be unique across all clients that the authorization server handles.
* `sasl.oauthbearer.client.secret` - This is only known to the application
and the authorization server. This should be a sufficiently random string
that is not guessable.
* `sasl.oauthbearer.scope` - Client use this to specify the scope of the
access request to the broker.
* `sasl.oauthbearer.extensions` - Allow additional information to be provided
to the broker. It's a comma-separated list of key=value pairs.
For example:
`supportFeatureX=true,organizationId=sales-emea`


#### Sparse connections

Expand Down
4 changes: 4 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ if(WITH_SASL_OAUTHBEARER)
list(APPEND sources rdkafka_sasl_oauthbearer.c)
endif()

if(WITH_CURL)
list(APPEND sources rdkafka_sasl_oauthbearer_oidc.c)
endif()

if(WITH_ZLIB)
list(APPEND sources rdgz.c)
endif()
Expand Down
1 change: 1 addition & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ SRCS_$(WITH_ZSTD) += rdkafka_zstd.c
SRCS_$(WITH_HDRHISTOGRAM) += rdhdrhistogram.c
SRCS_$(WITH_SSL) += rdkafka_ssl.c
SRCS_$(WITH_CURL) += rdhttp.c
SRCS_$(WITH_CURL) += rdkafka_sasl_oauthbearer_oidc.c

SRCS_LZ4 = rdxxhash.c
ifneq ($(WITH_LZ4_EXT), y)
Expand Down
145 changes: 145 additions & 0 deletions src/rdhttp.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,151 @@ rd_http_error_t *rd_http_get(const char *url, rd_buf_t **rbufp) {
}


/**
* @brief Extract the JSON object from \p hreq and return it in \p *jsonp.
*
* @returns Returns NULL on success, or an JSON parsing error - this
* error object must be destroyed by calling rd_http_error_destroy().
*/
rd_http_error_t *rd_http_parse_json(rd_http_req_t *hreq, cJSON **jsonp) {
size_t len;
char *raw_json;
const char *end = NULL;
rd_slice_t slice;
rd_http_error_t *herr = NULL;

/* cJSON requires the entire input to parse in contiguous memory. */
rd_slice_init_full(&slice, hreq->hreq_buf);
len = rd_buf_len(hreq->hreq_buf);

raw_json = rd_malloc(len + 1);
rd_slice_read(&slice, raw_json, len);
raw_json[len] = '\0';

/* Parse JSON */
*jsonp = cJSON_ParseWithOpts(raw_json, &end, 0);

if (!*jsonp)
herr = rd_http_error_new(hreq->hreq_code,
"Failed to parse JSON response "
"at %" PRIusz "/%" PRIusz,
(size_t)(end - raw_json), len);
rd_free(raw_json);
return herr;
}


/**
* @brief Check if the error returned from HTTP(S) is temporary or not.
*
* @returns If the \p error_code is temporary, return rd_true,
* otherwise return rd_false.
*
* @locality Any thread.
*/
static rd_bool_t rd_http_is_failure_temporary(int error_code) {
switch (error_code) {
case 408: /**< Request timeout */
case 425: /**< Too early */
case 500: /**< Internal server error */
case 502: /**< Bad gateway */
case 503: /**< Service unavailable */
case 504: /**< Gateway timeout */
return rd_true;

default:
return rd_false;
}
jliunyu marked this conversation as resolved.
Show resolved Hide resolved
}


/**
* @brief Perform a blocking HTTP(S) request to \p url with
* HTTP(S) headers and data with \p timeout_s.
* If the HTTP(S) request fails, will retry another \p retries times
* with multiplying backoff \p retry_ms.
*
* @returns The result will be returned in \p *jsonp.
* Returns NULL on success (HTTP response code < 400), or an error
* object on transport, HTTP error or a JSON parsing error - this
* error object must be destroyed by calling rd_http_error_destroy().
*
* @locality Any thread.
*/
rd_http_error_t *rd_http_post_expect_json(rd_kafka_t *rk,
const char *url,
const struct curl_slist *headers,
const char *post_fields,
size_t post_fields_size,
int timeout_s,
int retries,
int retry_ms,
cJSON **jsonp) {
rd_http_error_t *herr;
rd_http_req_t hreq;
int i;
size_t len;
const char *content_type;

herr = rd_http_req_init(&hreq, url);
if (unlikely(herr != NULL))
return herr;

curl_easy_setopt(hreq.hreq_curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(hreq.hreq_curl, CURLOPT_TIMEOUT, timeout_s);

curl_easy_setopt(hreq.hreq_curl, CURLOPT_POSTFIELDSIZE,
post_fields_size);
curl_easy_setopt(hreq.hreq_curl, CURLOPT_POSTFIELDS, post_fields);

for (i = 0; i <= retries; i++) {
if (rd_kafka_terminating(rk)) {
rd_http_req_destroy(&hreq);
return rd_http_error_new(-1, "Terminating");
}

herr = rd_http_req_perform_sync(&hreq);
len = rd_buf_len(hreq.hreq_buf);

if (!herr) {
if (len > 0)
break; /* Success */
/* Empty response */
rd_http_req_destroy(&hreq);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you will need to set jsonp to NULL here, otherwise the caller will think this was a successful call but jsonp will be undefined.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @edenhill, hanks for the review comments. The jsonp is initialized as NULL from the caller already: https://github.com/jliunyu/librdkafka/blob/kip768-token/src/rdkafka_sasl_oauthbearer_oidc.c#L223.
From the current function, we didn't use it yet before this line. So I think we don't need to set jsonp to NULL here?

return NULL;
}
/* Retry if HTTP(S) request returns temporary error and there
* are remaining retries, else fail. */
if (i == retries || !rd_http_is_failure_temporary(herr->code)) {
rd_http_req_destroy(&hreq);
return herr;
}

/* Retry */
rd_http_error_destroy(herr);
rd_usleep(retry_ms * 1000 * (i + 1), &rk->rk_terminate);
}

content_type = rd_http_req_get_content_type(&hreq);

if (!content_type || rd_strncasecmp(content_type, "application/json",
strlen("application/json"))) {
if (!herr)
herr = rd_http_error_new(
hreq.hreq_code, "Response is not JSON encoded: %s",
content_type ? content_type : "(n/a)");
rd_http_req_destroy(&hreq);
return herr;
}

herr = rd_http_parse_json(&hreq, jsonp);

rd_http_req_destroy(&hreq);

jliunyu marked this conversation as resolved.
Show resolved Hide resolved
return herr;
}


/**
* @brief Same as rd_http_get() but requires a JSON response.
* The response is parsed and a JSON object is returned in \p *jsonp.
Expand Down
13 changes: 12 additions & 1 deletion src/rdhttp.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,20 @@ typedef struct rd_http_req_s {
* write to. */
} rd_http_req_t;

static void rd_http_req_destroy(rd_http_req_t *hreq);
rd_http_error_t *rd_http_req_init(rd_http_req_t *hreq, const char *url);
rd_http_error_t *rd_http_req_perform_sync(rd_http_req_t *hreq);
rd_http_error_t *rd_http_parse_json(rd_http_req_t *hreq, cJSON **jsonp);
rd_http_error_t *rd_http_post_expect_json(rd_kafka_t *rk,
const char *url,
const struct curl_slist *headers,
const char *data_to_token,
size_t data_to_token_size,
int timeout_s,
int retry,
int retry_ms,
cJSON **jsonp);
void rd_http_req_destroy(rd_http_req_t *hreq);

#endif


Expand Down
22 changes: 17 additions & 5 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
#include "rdkafka_interceptor.h"
#include "rdkafka_idempotence.h"
#include "rdkafka_sasl_oauthbearer.h"
#if WITH_CURL
#include "rdkafka_sasl_oauthbearer_oidc.h"
#endif
#if WITH_SSL
#include "rdkafka_ssl.h"
#endif
Expand Down Expand Up @@ -2238,11 +2241,20 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
rd_kafka_conf_set_oauthbearer_token_refresh_cb(
&rk->rk_conf, rd_kafka_oauthbearer_unsecured_token);

if (rk->rk_conf.sasl.oauthbearer.token_refresh_cb)
if (rk->rk_conf.sasl.oauthbearer.token_refresh_cb &&
rk->rk_conf.sasl.oauthbearer.method !=
RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC)
rk->rk_conf.enabled_events |=
RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH;
#endif

#if WITH_CURL
if (rk->rk_conf.sasl.oauthbearer.method ==
RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC &&
!rk->rk_conf.sasl.oauthbearer.token_refresh_cb)
edenhill marked this conversation as resolved.
Show resolved Hide resolved
rd_kafka_conf_set_oauthbearer_token_refresh_cb(
&rk->rk_conf, rd_kafka_oidc_token_refresh_cb);
#endif
rk->rk_controllerid = -1;

/* Admin client defaults */
Expand Down Expand Up @@ -2330,7 +2342,6 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
rk->rk_conf.security_protocol = RD_KAFKA_PROTO_PLAINTEXT;
}


if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) {
/* Select SASL provider */
Expand Down Expand Up @@ -2404,10 +2415,11 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
* out from rd_kafka_new(). */
if (rk->rk_conf.background_event_cb ||
(rk->rk_conf.enabled_events & RD_KAFKA_EVENT_BACKGROUND)) {
rd_kafka_resp_err_t err;
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
rd_kafka_wrlock(rk);
err =
rd_kafka_background_thread_create(rk, errstr, errstr_size);
if (!rk->rk_background.q)
err = rd_kafka_background_thread_create(rk, errstr,
errstr_size);
rd_kafka_wrunlock(rk);
if (err)
goto fail;
Expand Down
39 changes: 36 additions & 3 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -3577,8 +3577,7 @@ static void rd_kafka_sw_str_sanitize_inplace(char *str) {
* on success. The array count is returned in \p cntp.
* The returned pointer must be freed with rd_free().
*/
static RD_UNUSED char **
rd_kafka_conf_kv_split(const char **input, size_t incnt, size_t *cntp) {
char **rd_kafka_conf_kv_split(const char **input, size_t incnt, size_t *cntp) {
size_t i;
char **out, *p;
size_t lens = 0;
Expand Down Expand Up @@ -3686,12 +3685,46 @@ const char *rd_kafka_conf_finalize(rd_kafka_type_t cltype,
"`sasl.oauthbearer.method=oidc` are "
"mutually exclusive";

if (conf->sasl.oauthbearer.method ==
RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC) {
if (!conf->sasl.oauthbearer.client_id)
return "`sasl.oauthbearer.client.id` is "
"mandatory when "
"`sasl.oauthbearer.method=oidc` is set";

if (!conf->sasl.oauthbearer.client_secret) {
return "`sasl.oauthbearer.client.secret` is "
"mandatory when "
"`sasl.oauthbearer.method=oidc` is set";
}

if (!conf->sasl.oauthbearer.token_endpoint_url) {
return "`sasl.oauthbearer.token.endpoint.url` "
"is mandatory when "
"`sasl.oauthbearer.method=oidc` is set";
}

if (!conf->sasl.oauthbearer.scope) {
return "`sasl.oauthbearer.scope` "
"is mandatory when "
"`sasl.oauthbearer.method=oidc` is set";
}

if (!conf->sasl.oauthbearer.extensions_str) {
return "`sasl.oauthbearer.extensions` "
"is mandatory when "
"`sasl.oauthbearer.method=oidc` is set";
}
}

/* Enable background thread for the builtin OIDC handler,
* unless a refresh callback has been set. */
if (conf->sasl.oauthbearer.method ==
RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC &&
!conf->sasl.oauthbearer.token_refresh_cb)
!conf->sasl.oauthbearer.token_refresh_cb) {
conf->enabled_events |= RD_KAFKA_EVENT_BACKGROUND;
conf->sasl.enable_callback_queue = 1;
}
}

#endif
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ struct rd_kafka_topic_conf_s {
};


char **rd_kafka_conf_kv_split(const char **input, size_t incnt, size_t *cntp);

void rd_kafka_anyconf_destroy(int scope, void *conf);

Expand Down
15 changes: 8 additions & 7 deletions src/rdkafka_sasl_oauthbearer.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
#include <openssl/evp.h>
#include "rdunittest.h"

#if WITH_CURL
#include "rdkafka_sasl_oauthbearer_oidc.h"
#endif


/**
Expand Down Expand Up @@ -1321,17 +1324,15 @@ static int rd_kafka_sasl_oauthbearer_init(rd_kafka_t *rk,
handle->callback_q = rd_kafka_q_keep(rk->rk_rep);
}

#if WITH_CURL
if (rk->rk_conf.sasl.oauthbearer.method ==
RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC &&
#if FIXME /************************ FIXME when .._oidc.c is added ****/
rk->rk_conf.sasl.oauthbearer.token_refresh_cb ==
rd_kafka_sasl_oauthbearer_oidc_token_refresh_cb
#else
1
#endif
) /* move this paren up on the .._refresh_cb
* line when FIXME is fixed. */
rd_kafka_oidc_token_refresh_cb) {
handle->internal_refresh = rd_true;
rd_kafka_sasl_background_callbacks_enable(rk);
}
#endif

/* Otherwise enqueue a refresh callback for the application. */
rd_kafka_oauthbearer_enqueue_token_refresh(handle);
Expand Down
Loading