Skip to content

Commit

Permalink
Address review comments: Part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Aug 9, 2023
1 parent 45780f8 commit 61840ab
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 65 deletions.
9 changes: 5 additions & 4 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -8087,7 +8087,7 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req,
int i;
rd_kafka_op_t *rko_result = NULL;

err = rd_kafka_parse_Metadata(rkb, NULL, reply, &mdi, &topics);
err = rd_kafka_parse_Metadata_admin(rkb, reply, &topics, &mdi);
if (err)
goto err;

Expand Down Expand Up @@ -8154,6 +8154,8 @@ void rd_kafka_DescribeTopics(rd_kafka_t *rk,
if (topics_cnt == 0) {
rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__INVALID_ARG,
"No topics to describe");
rd_kafka_admin_common_worker_destroy(rk, rko,
rd_true /*destroy*/);
return;
}

Expand Down Expand Up @@ -8257,11 +8259,10 @@ rd_kafka_DescribeCluster_result_description(
static rd_kafka_ClusterDescription_t *
rd_kafka_ClusterDescription_new(const rd_kafka_metadata_internal_t *mdi) {
const rd_kafka_metadata_t *md = &mdi->metadata;
rd_kafka_ClusterDescription_t *clusterdesc;
rd_kafka_ClusterDescription_t *clusterdesc = rd_calloc(1, sizeof(*clusterdesc));
rd_list_t *authorized_operations = rd_kafka_AuthorizedOperations_parse(
mdi->cluster_authorized_operations);
int i;
clusterdesc = rd_calloc(1, sizeof(*clusterdesc));

clusterdesc->cluster_id = rd_strdup(mdi->cluster_id);
clusterdesc->controller_id = mdi->controller_id;
Expand Down Expand Up @@ -8345,7 +8346,7 @@ rd_kafka_DescribeClusterResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
rd_kafka_op_t *rko_result = NULL;

err = rd_kafka_parse_Metadata(rkb, NULL, reply, &mdi, &topics);
err = rd_kafka_parse_Metadata_admin(rkb, reply, &topics, &mdi);
if (err)
goto err;

Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2055,7 +2055,7 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk,
/* force_racks is true if any memeber has a client rack set,
since we will require partition to rack mapping in that
case for rack-aware assignors. */
any_member_rack, rko, rd_false /* force */, NULL);
any_member_rack, rko);
rd_list_destroy(&topics);

} else {
Expand Down
96 changes: 61 additions & 35 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ rd_kafka_metadata(rd_kafka_t *rk,
* topics in the cluster, since a
* partial request may make it seem
* like some subscribed topics are missing. */
all_topics ? rd_true : rd_false, rd_false /* force_racks */, rko,
rd_false /* force */, NULL);
all_topics ? rd_true : rd_false, rd_false /* force_racks */, rko);

rd_list_destroy(&topics);
rd_kafka_broker_destroy(rkb);
Expand Down Expand Up @@ -475,26 +474,14 @@ rd_kafka_populate_metadata_topic_racks(rd_tmpabuf_t *tbuf,
}
}


/**
* @brief Handle a Metadata response message.
*
* @param topics are the requested topics (may be NULL).
* @param request_topics Used when rd_kafka_buf_t* request is NULL.
*
* The metadata will be marshalled into 'rd_kafka_metadata_internal_t *'.
*
* The marshalled metadata is returned in \p *mdip, (NULL on error).
*
* @returns an error code on parse failure, else NO_ERRRO.
*
* @locality rdkafka main thread
*/
rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *request,
rd_kafka_buf_t *rkbuf,
rd_kafka_metadata_internal_t **mdip,
rd_list_t *request_topics) {
/* Internal implementation for parsing Metadata. */
static rd_kafka_resp_err_t
rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *request,
rd_kafka_buf_t *rkbuf,
rd_kafka_metadata_internal_t **mdip,
rd_list_t *request_topics,
const char *reason) {
rd_kafka_t *rk = rkb->rkb_rk;
int i, j, k;
rd_tmpabuf_t tbuf;
Expand All @@ -512,7 +499,6 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
: rd_false;
rd_bool_t has_reliable_leader_epochs =
rd_kafka_has_reliable_leader_epochs(rkb);
const char *reason = "(no reason)";
int ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;
rd_kafkap_str_t cluster_id = RD_ZERO_INIT;
int32_t controller_id = -1;
Expand All @@ -530,11 +516,9 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
request ? request->rkbuf_u.Metadata.force_racks : rd_false;
rd_bool_t compute_racks = has_client_rack || force_rack_computation;

/* If there's no request, we're parsing this for an AdminAPI. */
if (!request)
reason = "(admin request)";
else if (request->rkbuf_u.Metadata.reason)
reason = request->rkbuf_u.Metadata.reason;
/* If there's reason is NULL, set it to a human-readable string. */
if (!reason)
reason = "(no reason)";

/* Ignore metadata updates when terminating */
if (rd_kafka_terminating(rkb->rkb_rk)) {
Expand Down Expand Up @@ -611,7 +595,8 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,

if (ApiVersion >= 2) {
rd_kafka_buf_read_str(rkbuf, &cluster_id);
mdi->cluster_id = rd_tmpabuf_write_str(&tbuf, cluster_id.str);
if (cluster_id.str)
mdi->cluster_id = rd_tmpabuf_write_str(&tbuf, cluster_id.str);
}


Expand Down Expand Up @@ -1029,6 +1014,50 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
}


/**
* @brief Handle a Metadata response message.
*
* @param request Initial Metadata request, containing the topic information.
* Must not be NULL.
* @param mdip A pointer to (rd_kafka_metadata_internal_t *) into which the
* metadata will be marshalled (set to NULL on error.)
*
* @returns an error code on parse failure, else NO_ERRRO.
*
* @locality rdkafka main thread
*/
rd_kafka_resp_err_t
rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *request,
rd_kafka_buf_t *rkbuf,
rd_kafka_metadata_internal_t **mdip) {
const char *reason = request->rkbuf_u.Metadata.reason;
return rd_kafka_parse_Metadata0(rkb, request, rkbuf, mdip, NULL,
reason);
}

/**
* @brief Handle a Metadata response message for admin requests.
*
* @param request_topics List containing topics in Metadata request. Must not be
* NULL.
* @param mdip A pointer to (rd_kafka_metadata_internal_t *) into which the
* metadata will be marshalled (set to NULL on error.)
*
* @returns an error code on parse failure, else NO_ERRRO.
*
* @locality rdkafka main thread
*/
rd_kafka_resp_err_t
rd_kafka_parse_Metadata_admin(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf,
rd_list_t *request_topics,
rd_kafka_metadata_internal_t **mdip) {
return rd_kafka_parse_Metadata0(rkb, NULL, rkbuf, mdip, request_topics,
"(admin request)");
}


/**
* @brief Add all topics in current cached full metadata
* that matches the topics in \p match
Expand Down Expand Up @@ -1305,8 +1334,7 @@ rd_kafka_metadata_refresh_topics(rd_kafka_t *rk,
rd_list_cnt(&q_topics), rd_list_cnt(topics), reason);

rd_kafka_MetadataRequest(rkb, &q_topics, reason, allow_auto_create,
cgrp_update, rd_false /* force_racks */, NULL,
rd_false /* force */, NULL);
cgrp_update, rd_false /* force_racks */, NULL);

rd_list_destroy(&q_topics);

Expand Down Expand Up @@ -1483,8 +1511,7 @@ rd_kafka_resp_err_t rd_kafka_metadata_refresh_all(rd_kafka_t *rk,
rd_list_init(&topics, 0, NULL); /* empty list = all topics */
rd_kafka_MetadataRequest(
rkb, &topics, reason, rd_false /*no auto create*/,
rd_true /*cgrp update*/, rd_false /* force_rack */, NULL,
0 /* force */, NULL);
rd_true /*cgrp update*/, rd_false /* force_rack */, NULL);
rd_list_destroy(&topics);

if (destroy_rkb)
Expand Down Expand Up @@ -1522,8 +1549,7 @@ rd_kafka_metadata_request(rd_kafka_t *rk,
}

rd_kafka_MetadataRequest(rkb, topics, reason, allow_auto_create_topics,
cgrp_update, rd_false /* force racks */, rko,
rd_false /* force */, NULL);
cgrp_update, rd_false /* force racks */, rko);

if (destroy_rkb)
rd_kafka_broker_destroy(rkb);
Expand Down
8 changes: 6 additions & 2 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,12 @@ rd_bool_t rd_kafka_has_reliable_leader_epochs(rd_kafka_broker_t *rkb);
rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *request,
rd_kafka_buf_t *rkbuf,
rd_kafka_metadata_internal_t **mdp,
rd_list_t *request_topics);
rd_kafka_metadata_internal_t **mdip);

rd_kafka_resp_err_t rd_kafka_parse_Metadata_admin(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf,
rd_list_t *request_topics,
rd_kafka_metadata_internal_t **mdip);

rd_kafka_metadata_internal_t *
rd_kafka_metadata_copy(const rd_kafka_metadata_internal_t *mdi, size_t size);
Expand Down
63 changes: 43 additions & 20 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2119,7 +2119,7 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk,
rd_list_cnt(topics),
request->rkbuf_u.Metadata.reason);

err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &mdi, NULL);
err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &mdi);
if (err)
goto err;

Expand Down Expand Up @@ -2177,7 +2177,21 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk,
}


/* Internal implementation for MetadataRequests. */
/**
* @brief Internal implementation of MetadataRequest (does not send).
*
* @param force - 1: force a full request (including all topics and brokers)
* even if there is such a request already in flight.
* - 0: check if there are multiple outstanding full requests, and
* don't send one if there is already one present.
* (See note below.)
*
* If full metadata for all topics is requested (or
* all brokers, which results in all-topics on older brokers) and there is
* already a full request in transit then this function will return
* RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS otherwise RD_KAFKA_RESP_ERR_NO_ERROR.
* If \p rko is non-NULL or if \p force is true, the request is sent regardless.
*/
static rd_kafka_resp_err_t
rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb,
const rd_list_t *topics,
Expand All @@ -2189,17 +2203,19 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb,
rd_bool_t force_racks,
rd_kafka_op_t *rko,
rd_kafka_resp_cb_t *resp_cb,
int force,
rd_bool_t force,
void *opaque) {
rd_kafka_buf_t *rkbuf;
int16_t ApiVersion = 0;
size_t of_TopicArrayCnt;
int features;
int topic_cnt = topics ? rd_list_cnt(topics) : 0;
int *full_incr = NULL;
void *handler_arg;
rd_kafka_resp_cb_t *handler_cb = rd_kafka_handle_Metadata;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_Metadata, 0, 8, &features);
rkb, RD_KAFKAP_Metadata, 0, 9, &features);
rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_Metadata, 1,
4 + (50 * topic_cnt) + 1,
ApiVersion >= 9);
Expand Down Expand Up @@ -2338,6 +2354,21 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb,
* and should go before most other requests (Produce, Fetch, etc). */
rkbuf->rkbuf_prio = RD_KAFKA_PRIO_HIGH;

/* The default handler is rd_kafka_handle_Metadata, but it can be
* overriden to use a custom handler. */
if (resp_cb)
handler_cb = resp_cb;

/* If a custom handler is provided, we also allow the caller to set a
* custom argument which is passed as the opaque argument to the
* handler. However, if we're using the default handler, it expects
* either rko or NULL as its opaque argument (it forwards the response
* to rko's replyq if it's non-NULL). */
if (resp_cb && opaque)
handler_arg = opaque;
else
handler_arg = rko;

rd_kafka_broker_buf_enq_replyq(
rkb, rkbuf,
/* Handle response thru rk_ops,
Expand All @@ -2346,12 +2377,7 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb,
RD_KAFKA_REPLYQ(rkb->rkb_rk->rk_ops, 0),
/* The default response handler is rd_kafka_handle_Metadata, but we
allow alternate handlers to be configured. */
resp_cb ? resp_cb : rd_kafka_handle_Metadata,
/* rd_kafka_handle_Metadata (the default handler) makes use of the
rko (if set) to forward the response to the rko's replyq. So, in
case the opaque value hasn't been overrriden, we set it to the
rko. */
rko ? rko : opaque);
handler_cb, handler_arg);

return RD_KAFKA_RESP_ERR_NO_ERROR;
}
Expand All @@ -2377,35 +2403,31 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb,
* @param rko - (optional) rko with replyq for handling response.
* Specifying an rko forces a metadata request even if
* there is already a matching one in-transit.
* @param force - 1: force a full request (including all topics and brokers)
* even if there is such a request already in flight.
* - 0: check if there are multiple outstanding full requests, and
* don't send one if there is already one present.
* (See note below.)
*
* If full metadata for all topics is requested (or
* all brokers, which results in all-topics on older brokers) and there is
* already a full request in transit then this function will return
* RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS otherwise RD_KAFKA_RESP_ERR_NO_ERROR.
* If \p rko is non-NULL or if \p force is true, the request is sent regardless.
* If \p rko is non-NULL, the request is sent regardless.
*/
rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
const rd_list_t *topics,
const char *reason,
rd_bool_t allow_auto_create_topics,
rd_bool_t cgrp_update,
rd_bool_t force_racks,
rd_kafka_op_t *rko,
rd_bool_t force,
void *opaque) {
rd_kafka_op_t *rko) {
return rd_kafka_MetadataRequest0(
rkb, topics, reason, allow_auto_create_topics,
/* cluster and topic authorized operations are used by admin
operations only. */
rd_false, rd_false, cgrp_update, force_racks, rko,
/* In all other situations apart from admin ops, we use
rd_kafka_handle_Metadata rather than a custom resp_cb */
NULL, force, opaque);
NULL,
/* If the request needs to be forced, rko_u.metadata.force will be
set. */
rd_false, NULL);
}


Expand All @@ -2428,6 +2450,7 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
* @param force_racks - Force partition to rack mapping computation in
* parse_Metadata (see comment there).
* @param resp_cb - callback to be used for handling response.
* @param opaque - (optional) parameter to be passed to resp_cb.
*/
rd_kafka_resp_err_t
rd_kafka_MetadataRequest_admin(rd_kafka_broker_t *rkb,
Expand Down
4 changes: 1 addition & 3 deletions src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
rd_bool_t allow_auto_create_topics,
rd_bool_t cgrp_update,
rd_bool_t force_racks,
rd_kafka_op_t *rko,
rd_bool_t force,
void *opaque);
rd_kafka_op_t *rko);

rd_kafka_resp_err_t
rd_kafka_MetadataRequest_admin(rd_kafka_broker_t *rkb,
Expand Down

0 comments on commit 61840ab

Please sign in to comment.