Skip to content

Commit

Permalink
Merge branch 'master' into dev_kip430_cp
Browse files Browse the repository at this point in the history
  • Loading branch information
jainruchir authored Jun 14, 2023
2 parents 7c15746 + 865cf60 commit 99a368d
Show file tree
Hide file tree
Showing 24 changed files with 869 additions and 28 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ librdkafka v2.2.0 is a feature release:
(#4184, #4291, #4252).
* Fix several bugs with sticky assignor in case of partition ownership
changing between members of the consumer group (#4252).
* [KIP-368](https://cwiki.apache.org/confluence/display/KAFKA/KIP-368%3A+Allow+SASL+Connections+to+Periodically+Re-Authenticate):
Allow SASL Connections to Periodically Re-Authenticate
(#4301, started by @vctoriawu).
* Avoid treating an OpenSSL error as a permanent error and treat unclean SSL
closes as normal ones (#4294).


## Fixes
Expand All @@ -23,6 +28,15 @@ librdkafka v2.2.0 is a feature release:
when using Confluent Platform, only when racks are set,
observers are activated and there is more than one partition.
Fixed by skipping the correct amount of bytes when tags are received.
* Avoid treating an OpenSSL error as a permanent error and treat unclean SSL
closes as normal ones. When SSL connections are closed without `close_notify`,
in OpenSSL 3.x a new type of error is set and it was interpreted as permanent
in librdkafka. It can cause a different issue depending on the RPC.
If received when waiting for OffsetForLeaderEpoch response, it triggers
an offset reset following the configured policy.
Solved by treating SSL errors as transport errors and
by setting an OpenSSL flag that allows to treat unclean SSL closes as normal
ones. These types of errors can happen it the other side doesn't support `close_notify` or if there's a TCP connection reset.


### Consumer fixes
Expand Down
4 changes: 2 additions & 2 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1917,7 +1917,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-359 - Producer: use EpochLeaderId | 2.4.0 | Not supported |
| KIP-360 - Improve handling of unknown Idempotent Producer | 2.5.0 | Supported |
| KIP-361 - Consumer: add config to disable auto topic creation | 2.3.0 | Supported |
| KIP-368 - SASL periodic reauth | 2.2.0 | Not supported |
| KIP-368 - SASL periodic reauth | 2.2.0 | Supported |
| KIP-369 - Always roundRobin partitioner | 2.4.0 | Not supported |
| KIP-389 - Consumer group max size | 2.2.0 | Supported (error is propagated to application, but the consumer does not raise a fatal error) |
| KIP-392 - Allow consumers to fetch from closest replica | 2.4.0 | Supported |
Expand Down Expand Up @@ -1996,7 +1996,7 @@ release of librdkafka.
| 28 | TxnOffsetCommit | 3 | 3 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 1 |
| 36 | SaslAuthenticate | 2 | 0 |
| 36 | SaslAuthenticate | 2 | 1 |
| 37 | CreatePartitions | 3 | 0 |
| 42 | DeleteGroups | 2 | 1 |
| 47 | OffsetDelete | 0 | 0 |
Expand Down
98 changes: 93 additions & 5 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@
static const int rd_kafka_max_block_ms = 1000;

const char *rd_kafka_broker_state_names[] = {
"INIT", "DOWN", "TRY_CONNECT", "CONNECT", "SSL_HANDSHAKE",
"AUTH_LEGACY", "UP", "UPDATE", "APIVERSION_QUERY", "AUTH_HANDSHAKE",
"AUTH_REQ"};
"INIT", "DOWN", "TRY_CONNECT", "CONNECT", "SSL_HANDSHAKE",
"AUTH_LEGACY", "UP", "UPDATE", "APIVERSION_QUERY", "AUTH_HANDSHAKE",
"AUTH_REQ", "REAUTH"};

const char *rd_kafka_secproto_names[] = {
[RD_KAFKA_PROTO_PLAINTEXT] = "plaintext",
Expand Down Expand Up @@ -573,6 +573,8 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
rkb->rkb_recv_buf = NULL;
}

rkb->rkb_reauth_in_progress = rd_false;

va_start(ap, fmt);
rd_kafka_broker_set_error(rkb, level, err, fmt, ap);
va_end(ap);
Expand All @@ -591,6 +593,11 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
old_state = rkb->rkb_state;
rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_DOWN);

/* Stop any pending reauth timer, since a teardown/reconnect will
* require a new timer. */
rd_kafka_timer_stop(&rkb->rkb_rk->rk_timers, &rkb->rkb_sasl_reauth_tmr,
1 /*lock*/);

/* Unlock broker since a requeue will try to lock it. */
rd_kafka_broker_unlock(rkb);

Expand Down Expand Up @@ -1834,7 +1841,7 @@ static rd_kafka_buf_t *rd_kafka_waitresp_find(rd_kafka_broker_t *rkb,
*/
static int rd_kafka_req_response(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf) {
rd_kafka_buf_t *req;
rd_kafka_buf_t *req = NULL;
int log_decode_errors = LOG_ERR;

rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
Expand Down Expand Up @@ -2237,7 +2244,8 @@ static int rd_kafka_broker_connect(rd_kafka_broker_t *rkb) {
*/
void rd_kafka_broker_connect_up(rd_kafka_broker_t *rkb) {

rkb->rkb_max_inflight = rkb->rkb_rk->rk_conf.max_inflight;
rkb->rkb_max_inflight = rkb->rkb_rk->rk_conf.max_inflight;
rkb->rkb_reauth_in_progress = rd_false;

rd_kafka_broker_lock(rkb);
rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP);
Expand Down Expand Up @@ -3451,6 +3459,20 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) {
wakeup = rd_true;
break;

case RD_KAFKA_OP_SASL_REAUTH:
rd_rkb_dbg(rkb, BROKER, "REAUTH", "Received REAUTH op");

/* We don't need a lock for rkb_max_inflight. It's changed only
* on the broker thread. */
rkb->rkb_max_inflight = 1;

rd_kafka_broker_lock(rkb);
rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_REAUTH);
rd_kafka_broker_unlock(rkb);

wakeup = rd_true;
break;

default:
rd_kafka_assert(rkb->rkb_rk, !*"unhandled op type");
break;
Expand Down Expand Up @@ -4528,8 +4550,15 @@ static int rd_kafka_broker_thread_main(void *arg) {
rd_kafka_broker_addresses_exhausted(rkb))
rd_kafka_broker_update_reconnect_backoff(
rkb, &rkb->rkb_rk->rk_conf, rd_clock());
/* If we haven't made progress from the last state, and
* if we have exceeded
* socket_connection_setup_timeout_ms, then error out.
* Don't error out in case this is a reauth, for which
* socket_connection_setup_timeout_ms is not
* applicable. */
else if (
rkb->rkb_state == orig_state &&
!rkb->rkb_reauth_in_progress &&
rd_clock() >=
(rkb->rkb_ts_connect +
(rd_ts_t)rk->rk_conf
Expand All @@ -4544,6 +4573,22 @@ static int rd_kafka_broker_thread_main(void *arg) {

break;

case RD_KAFKA_BROKER_STATE_REAUTH:
/* Since we've already authenticated once, the provider
* should be ready. */
rd_assert(rd_kafka_sasl_ready(rkb->rkb_rk));

/* Since we aren't disconnecting, the transport isn't
* destroyed, and as a consequence, some of the SASL
* state leaks unless we destroy it before the reauth.
*/
rd_kafka_sasl_close(rkb->rkb_transport);

rkb->rkb_reauth_in_progress = rd_true;

rd_kafka_broker_connect_auth(rkb);
break;

case RD_KAFKA_BROKER_STATE_UPDATE:
/* FALLTHRU */
case RD_KAFKA_BROKER_STATE_UP:
Expand Down Expand Up @@ -4672,6 +4717,9 @@ void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {
mtx_unlock(&rkb->rkb_logname_lock);
mtx_destroy(&rkb->rkb_logname_lock);

rd_kafka_timer_stop(&rkb->rkb_rk->rk_timers, &rkb->rkb_sasl_reauth_tmr,
1 /*lock*/);

mtx_destroy(&rkb->rkb_lock);

rd_refcnt_destroy(&rkb->rkb_refcnt);
Expand Down Expand Up @@ -5851,6 +5899,46 @@ void rd_kafka_broker_monitor_del(rd_kafka_broker_monitor_t *rkbmon) {
rd_kafka_broker_destroy(rkb);
}

/**
* @brief Starts the reauth timer for this broker.
* If connections_max_reauth_ms=0, then no timer is set.
*
* @locks none
* @locality broker thread
*/
void rd_kafka_broker_start_reauth_timer(rd_kafka_broker_t *rkb,
int64_t connections_max_reauth_ms) {
/* Timer should not already be started. It indicates that we're about to
* schedule an extra reauth, but this shouldn't be a cause for failure
* in production use cases, so, clear the timer. */
if (rd_kafka_timer_is_started(&rkb->rkb_rk->rk_timers,
&rkb->rkb_sasl_reauth_tmr))
rd_kafka_timer_stop(&rkb->rkb_rk->rk_timers,
&rkb->rkb_sasl_reauth_tmr, 1 /*lock*/);

if (connections_max_reauth_ms == 0)
return;

rd_kafka_timer_start_oneshot(
&rkb->rkb_rk->rk_timers, &rkb->rkb_sasl_reauth_tmr, rd_false,
connections_max_reauth_ms * 900 /* 90% * microsecond*/,
rd_kafka_broker_start_reauth_cb, (void *)rkb);
}

/**
* @brief Starts the reauth process for the broker rkb.
*
* @locks none
* @locality main thread
*/
void rd_kafka_broker_start_reauth_cb(rd_kafka_timers_t *rkts, void *_rkb) {
rd_kafka_op_t *rko = NULL;
rd_kafka_broker_t *rkb = (rd_kafka_broker_t *)_rkb;
rd_dassert(rkb);
rko = rd_kafka_op_new(RD_KAFKA_OP_SASL_REAUTH);
rd_kafka_q_enq(rkb->rkb_ops, rko);
}

/**
* @name Unit tests
* @{
Expand Down
12 changes: 12 additions & 0 deletions src/rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef enum {
RD_KAFKA_BROKER_STATE_APIVERSION_QUERY,
RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE,
RD_KAFKA_BROKER_STATE_AUTH_REQ,
RD_KAFKA_BROKER_STATE_REAUTH,
} rd_kafka_broker_state_t;

/**
Expand Down Expand Up @@ -252,6 +253,9 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */
/** Absolute time of last connection attempt. */
rd_ts_t rkb_ts_connect;

/** True if a reauthentication is in progress. */
rd_bool_t rkb_reauth_in_progress;

/**< Persistent connection demand is tracked by
* a counter for each type of demand.
* The broker thread will maintain a persistent connection
Expand Down Expand Up @@ -323,6 +327,9 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */
rd_kafka_resp_err_t err; /**< Last error code */
int cnt; /**< Number of identical errors */
} rkb_last_err;


rd_kafka_timer_t rkb_sasl_reauth_tmr;
};

#define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt)
Expand Down Expand Up @@ -602,6 +609,11 @@ void rd_kafka_broker_monitor_add(rd_kafka_broker_monitor_t *rkbmon,

void rd_kafka_broker_monitor_del(rd_kafka_broker_monitor_t *rkbmon);

void rd_kafka_broker_start_reauth_timer(rd_kafka_broker_t *rkb,
int64_t connections_max_reauth_ms);

void rd_kafka_broker_start_reauth_cb(rd_kafka_timers_t *rkts, void *rkb);

int unittest_broker(void);

#endif /* _RDKAFKA_BROKER_H_ */
2 changes: 1 addition & 1 deletion src/rdkafka_feature.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ static const struct rd_kafka_feature_map {
.depends =
{
{RD_KAFKAP_SaslHandshake, 1, 1},
{RD_KAFKAP_SaslAuthenticate, 0, 0},
{RD_KAFKAP_SaslAuthenticate, 0, 1},
{-1},
},
},
Expand Down
4 changes: 1 addition & 3 deletions src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -998,10 +998,8 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
&rk->rk_timers, &rktp->rktp_validate_tmr, rd_false,
500 * 1000 /* 500ms */,
rd_kafka_offset_validate_tmr_cb, rktp);
goto done;
}

if (!(actions & RD_KAFKA_ERR_ACTION_REFRESH)) {
} else if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) {
/* Permanent error */
rd_kafka_offset_reset(
rktp, rd_kafka_broker_id(rkb),
Expand Down
10 changes: 6 additions & 4 deletions src/rdkafka_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) {
[RD_KAFKA_OP_TXN] = "REPLY:TXN",
[RD_KAFKA_OP_GET_REBALANCE_PROTOCOL] =
"REPLY:GET_REBALANCE_PROTOCOL",
[RD_KAFKA_OP_LEADERS] = "REPLY:LEADERS",
[RD_KAFKA_OP_BARRIER] = "REPLY:BARRIER",
[RD_KAFKA_OP_LEADERS] = "REPLY:LEADERS",
[RD_KAFKA_OP_BARRIER] = "REPLY:BARRIER",
[RD_KAFKA_OP_SASL_REAUTH] = "REPLY:SASL_REAUTH",
};

if (type & RD_KAFKA_OP_REPLY)
Expand Down Expand Up @@ -257,8 +258,9 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) {
[RD_KAFKA_OP_TXN] = sizeof(rko->rko_u.txn),
[RD_KAFKA_OP_GET_REBALANCE_PROTOCOL] =
sizeof(rko->rko_u.rebalance_protocol),
[RD_KAFKA_OP_LEADERS] = sizeof(rko->rko_u.leaders),
[RD_KAFKA_OP_BARRIER] = _RD_KAFKA_OP_EMPTY,
[RD_KAFKA_OP_LEADERS] = sizeof(rko->rko_u.leaders),
[RD_KAFKA_OP_BARRIER] = _RD_KAFKA_OP_EMPTY,
[RD_KAFKA_OP_SASL_REAUTH] = _RD_KAFKA_OP_EMPTY,
};
size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK];

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ typedef enum {
RD_KAFKA_OP_GET_REBALANCE_PROTOCOL, /**< Get rebalance protocol */
RD_KAFKA_OP_LEADERS, /**< Partition leader query */
RD_KAFKA_OP_BARRIER, /**< Version barrier bump */
RD_KAFKA_OP_SASL_REAUTH, /**< Sasl reauthentication for broker */
RD_KAFKA_OP__END
} rd_kafka_op_type_t;

Expand Down
19 changes: 19 additions & 0 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ int rd_kafka_err_action(rd_kafka_broker_t *rkb,
break;

case RD_KAFKA_RESP_ERR__TRANSPORT:
case RD_KAFKA_RESP_ERR__SSL:
case RD_KAFKA_RESP_ERR__TIMED_OUT:
case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND:
Expand Down Expand Up @@ -2631,6 +2632,18 @@ void rd_kafka_handle_SaslAuthenticate(rd_kafka_t *rk,

rd_kafka_buf_read_bytes(rkbuf, &auth_data);

if (request->rkbuf_reqhdr.ApiVersion >= 1) {
int64_t session_lifetime_ms;
rd_kafka_buf_read_i64(rkbuf, &session_lifetime_ms);

if (session_lifetime_ms)
rd_kafka_dbg(
rk, SECURITY, "REAUTH",
"Received session lifetime %ld ms from broker",
session_lifetime_ms);
rd_kafka_broker_start_reauth_timer(rkb, session_lifetime_ms);
}

/* Pass SASL auth frame to SASL handler */
if (rd_kafka_sasl_recv(rkb->rkb_transport, auth_data.data,
(size_t)RD_KAFKAP_BYTES_LEN(&auth_data), errstr,
Expand Down Expand Up @@ -2664,6 +2677,8 @@ void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb,
rd_kafka_resp_cb_t *resp_cb,
void *opaque) {
rd_kafka_buf_t *rkbuf;
int16_t ApiVersion;
int features;

rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslAuthenticate, 0, 0);

Expand All @@ -2678,6 +2693,10 @@ void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb,
* close down the connection and reconnect on failure. */
rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_SaslAuthenticate, 0, 1, &features);
rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);

if (replyq.q)
rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb,
opaque);
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ void rd_kafka_handle_SaslAuthenticate(rd_kafka_t *rk,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
void *opaque);

void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb,
const void *buf,
size_t size,
Expand Down
5 changes: 5 additions & 0 deletions src/rdkafka_sasl.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ int rd_kafka_sasl_io_event(rd_kafka_transport_t *rktrans,
* @remark May be called on non-SASL transports (no-op)
*/
void rd_kafka_sasl_close(rd_kafka_transport_t *rktrans) {
/* The broker might not be up, and the transport might not exist in that
* case.*/
if (!rktrans)
return;

const struct rd_kafka_sasl_provider *provider =
rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.provider;

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_sasl_cyrus.c
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ static void rd_kafka_sasl_cyrus_close(struct rd_kafka_transport_s *rktrans) {
mtx_unlock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
}
rd_free(state);
rktrans->rktrans_sasl.state = NULL;
}


Expand Down
Loading

0 comments on commit 99a368d

Please sign in to comment.