Skip to content

Commit

Permalink
Move Admin request arguments to result op to make them available on m…
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Aug 24, 2021
1 parent 6c12e4c commit ca1b30e
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 12 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ librdkafka v1.8.0 is a security release:
from the broker.


### Admin fixes

* `DeleteRecords()` could crash if one of the underlying requests
(for a given partition leader) failed at the transport level (e.g., timeout).
(#3476).



# librdkafka v1.7.0

Expand Down
61 changes: 49 additions & 12 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,10 @@ rd_kafka_admin_fanout_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq,
*/

/**
* @brief Create a new admin_result op based on the request op \p rko_req
* @brief Create a new admin_result op based on the request op \p rko_req.
*
* @remark This moves the rko_req's admin_request.args list from \p rko_req
* to the returned rko. The \p rko_req args will be emptied.
*/
static rd_kafka_op_t *rd_kafka_admin_result_new (rd_kafka_op_t *rko_req) {
rd_kafka_op_t *rko_result;
Expand Down Expand Up @@ -361,6 +364,12 @@ static rd_kafka_op_t *rd_kafka_admin_result_new (rd_kafka_op_t *rko_req) {
rd_kafka_confval_get_ptr(&rko_req->rko_u.admin_request.
options.opaque);

/* Move request arguments (list) from request to result.
* This is mainly so that partial_response() knows what arguments
* were provided to the response's request it is merging. */
rd_list_move(&rko_result->rko_u.admin_result.args,
&rko_req->rko_u.admin_request.args);

rko_result->rko_evtype = rko_req->rko_u.admin_request.reply_event_type;

return rko_result;
Expand Down Expand Up @@ -1712,7 +1721,7 @@ rd_kafka_CreateTopicsResponse_parse (rd_kafka_op_t *rko_req,
* in the same order as they were requested. The broker
* does not maintain ordering unfortunately. */
skel.topic = terr->topic;
orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args,
orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
&skel, rd_kafka_NewTopic_cmp);
if (orig_pos == -1) {
rd_kafka_topic_result_destroy(terr);
Expand Down Expand Up @@ -1930,7 +1939,7 @@ rd_kafka_DeleteTopicsResponse_parse (rd_kafka_op_t *rko_req,
* in the same order as they were requested. The broker
* does not maintain ordering unfortunately. */
skel.topic = terr->topic;
orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args,
orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
&skel, rd_kafka_DeleteTopic_cmp);
if (orig_pos == -1) {
rd_kafka_topic_result_destroy(terr);
Expand Down Expand Up @@ -2224,7 +2233,7 @@ rd_kafka_CreatePartitionsResponse_parse (rd_kafka_op_t *rko_req,
* in the same order as they were requested. The broker
* does not maintain ordering unfortunately. */
skel.topic = terr->topic;
orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args,
orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
&skel, rd_kafka_NewPartitions_cmp);
if (orig_pos == -1) {
rd_kafka_topic_result_destroy(terr);
Expand Down Expand Up @@ -2796,7 +2805,7 @@ rd_kafka_AlterConfigsResponse_parse (rd_kafka_op_t *rko_req,
* does not maintain ordering unfortunately. */
skel.restype = config->restype;
skel.name = config->name;
orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args,
orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
&skel, rd_kafka_ConfigResource_cmp);
if (orig_pos == -1) {
rd_kafka_ConfigResource_destroy(config);
Expand Down Expand Up @@ -3101,7 +3110,7 @@ rd_kafka_DescribeConfigsResponse_parse (rd_kafka_op_t *rko_req,
* does not maintain ordering unfortunately. */
skel.restype = config->restype;
skel.name = config->name;
orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args,
orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
&skel, rd_kafka_ConfigResource_cmp);
if (orig_pos == -1)
rd_kafka_buf_parse_fail(
Expand Down Expand Up @@ -3257,13 +3266,40 @@ rd_kafka_DeleteRecords_response_merge (rd_kafka_op_t *rko_fanout,
rd_assert(rko_partial->rko_evtype ==
RD_KAFKA_EVENT_DELETERECORDS_RESULT);

/* Partitions from the DeleteRecordsResponse */
partitions = rd_list_elem(&rko_partial->rko_u.admin_result.results, 0);

/* Partitions (offsets) from the DeleteRecords() call */
/* All partitions (offsets) from the DeleteRecords() call */
respartitions = rd_list_elem(&rko_fanout->rko_u.admin_request.
fanout.results, 0);

if (rko_partial->rko_err) {
/* If there was a request-level error, set the error on
* all requested partitions for this request. */
const rd_kafka_topic_partition_list_t *reqpartitions;
rd_kafka_topic_partition_t *reqpartition;

/* Partitions (offsets) from this DeleteRecordsRequest */
reqpartitions = rd_list_elem(&rko_partial->rko_u.
admin_result.args, 0);

RD_KAFKA_TPLIST_FOREACH(reqpartition, reqpartitions) {
rd_kafka_topic_partition_t *respart;

/* Find result partition */
respart = rd_kafka_topic_partition_list_find(
respartitions,
reqpartition->topic,
reqpartition->partition);

rd_assert(respart || !*"respart not found");

respart->err = rko_partial->rko_err;
}

return;
}

/* Partitions from the DeleteRecordsResponse */
partitions = rd_list_elem(&rko_partial->rko_u.admin_result.results, 0);

RD_KAFKA_TPLIST_FOREACH(partition, partitions) {
rd_kafka_topic_partition_t *respart;

Expand Down Expand Up @@ -3898,8 +3934,7 @@ rd_kafka_OffsetDeleteResponse_parse (rd_kafka_op_t *rko_req,
rd_kafka_op_t *rko_result;
int16_t ErrorCode;
rd_kafka_topic_partition_list_t *partitions = NULL;
const rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets =
rd_list_elem(&rko_req->rko_u.admin_request.args, 0);
const rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets;

rd_kafka_buf_read_i16(reply, &ErrorCode);
if (ErrorCode) {
Expand All @@ -3924,6 +3959,8 @@ rd_kafka_OffsetDeleteResponse_parse (rd_kafka_op_t *rko_req,

/* Create result op and group_result_t */
rko_result = rd_kafka_admin_result_new(rko_req);
del_grpoffsets = rd_list_elem(&rko_result->rko_u.admin_result.args, 0);

rd_list_init(&rko_result->rko_u.admin_result.results, 1,
rd_kafka_group_result_free);
rd_list_add(&rko_result->rko_u.admin_result.results,
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,12 @@ struct rd_kafka_op_s {
rd_kafka_op_type_t reqtype; /**< Request op type,
* used for logging. */

rd_list_t args; /**< Args moved from the request op
* when the result op is created.
*
* Type depends on request.
*/

char *errstr; /**< Error string, if rko_err
* is set, else NULL. */

Expand Down
16 changes: 16 additions & 0 deletions src/rdlist.c
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,22 @@ void *rd_list_copy_preallocated (const void *elem, void *opaque) {
}



void rd_list_move (rd_list_t *dst, rd_list_t *src) {
rd_list_init_copy(dst, src);

if (src->rl_flags & RD_LIST_F_FIXED_SIZE) {
rd_list_copy_preallocated0(dst, src);
} else {
memcpy(dst->rl_elems, src->rl_elems,
src->rl_cnt * sizeof(*src->rl_elems));
dst->rl_cnt = src->rl_cnt;
}

src->rl_cnt = 0;
}


/**
* @name Misc helpers for common list types
* @{
Expand Down
9 changes: 9 additions & 0 deletions src/rdlist.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,15 @@ void *rd_list_string_copy (const void *elem, void *opaque) {



/**
* @brief Move elements from \p src to \p dst.
*
* @remark \p dst will be initialized first.
* @remark \p src will be emptied.
*/
void rd_list_move (rd_list_t *dst, rd_list_t *src);


/**
* @name Misc helpers for common list types
* @{
Expand Down

0 comments on commit ca1b30e

Please sign in to comment.