Skip to content

Commit

Permalink
Break into functions, update Supported KIPS
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Aug 2, 2023
1 parent 6f8aa0d commit 0bc97b3
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 37 deletions.
2 changes: 1 addition & 1 deletion INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-412 - AdminAPI: adjust log levels | 2.4.0 | Not supported |
| KIP-421 - Variables in client config files | 2.3.0 | Not applicable (librdkafka, et.al, does not provide a config file interface, and shouldn't) |
| KIP-429 - Consumer: incremental rebalance protocol | 2.4.0 | Supported |
| KIP-430 - AdminAPI: return authorized operations in Describe.. responses | 2.3.0 | Not supported |
| KIP-430 - AdminAPI: return authorized operations in Describe.. responses | 2.3.0 | Supported |
| KIP-436 - Start time in stats | 2.3.0 | Supported |
| KIP-447 - Producer scalability for EOS | 2.5.0 | Supported |
| KIP-455 - AdminAPI: Replica assignment | 2.4.0 (WIP) | Not supported |
Expand Down
84 changes: 48 additions & 36 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -7943,21 +7943,28 @@ const rd_kafka_TopicDescription_t **rd_kafka_DescribeTopics_result_topics(
rko->rko_u.admin_result.results.rl_elems;
}

/**
* @brief Helper function to clear a rd_kafka_metadata_partition.
*
* @note Does not deallocate the rd_kafka_metadata_partition itself.
*/
static void
rd_kafka_metadata_partition_clear(struct rd_kafka_metadata_partition *rkmp) {
RD_IF_FREE(rkmp->isrs, rd_free);
RD_IF_FREE(rkmp->replicas, rd_free);
}

static void
rd_kafka_TopicDescription_destroy(rd_kafka_TopicDescription_t *topicdesc) {
int i;
if (topicdesc->topic != NULL)
rd_free(topicdesc->topic);
if (topicdesc->error != NULL)
rd_kafka_error_destroy(topicdesc->error);
for (i = 0; i < topicdesc->partition_cnt; i++) {
if (topicdesc->partitions[i].isrs != NULL)
rd_free(topicdesc->partitions[i].isrs);
if (topicdesc->partitions[i].replicas != NULL)
rd_free(topicdesc->partitions[i].replicas);
}
if (topicdesc->partitions != NULL)
rd_free(topicdesc->partitions);

RD_IF_FREE(topicdesc->topic, rd_free);
RD_IF_FREE(topicdesc->error, rd_kafka_error_destroy);

for (i = 0; i < topicdesc->partition_cnt; i++)
rd_kafka_metadata_partition_clear(&topicdesc->partitions[i]);

RD_IF_FREE(topicdesc->partitions, rd_free);
rd_list_destroy(&topicdesc->authorized_operations);
rd_free(topicdesc);
}
Expand All @@ -7973,6 +7980,31 @@ static int rd_kafka_DescribeTopics_cmp(const void *a, const void *b) {
return strcmp(a, b);
}

/**
* @brief Helper function to copy from one rd_kafka_metadata_partition to
* another.
*
* @note Both are assumed to be allocated.
*/
static void
rd_kafka_copy_metadata_partition(struct rd_kafka_metadata_partition *src,
struct rd_kafka_metadata_partition *dst) {
int i;
dst->err = src->err;
dst->id = src->id;
dst->isr_cnt = src->isr_cnt;
dst->isrs = rd_calloc(sizeof(int32_t), dst->isr_cnt);
for (i = 0; i < dst->isr_cnt; i++) {
dst->isrs[i] = src->isrs[i];
}
dst->leader = src->leader;
dst->replica_cnt = src->replica_cnt;
dst->replicas = rd_calloc(sizeof(int32_t), dst->replica_cnt);
for (i = 0; i < dst->replica_cnt; i++) {
dst->replicas[i] = src->replicas[i];
}
}

/**
* @brief Create a new TopicDescription object.
*
Expand All @@ -7991,7 +8023,7 @@ rd_kafka_TopicDescription_new(const char *topic,
const rd_list_t *topic_authorized_operations,
rd_kafka_error_t *error) {
rd_kafka_TopicDescription_t *topicdesc;
int i, j;
int i;
topicdesc = rd_calloc(1, sizeof(*topicdesc));
topicdesc->topic = rd_strdup(topic);
topicdesc->partition_cnt = partition_cnt;
Expand All @@ -8011,29 +8043,9 @@ rd_kafka_TopicDescription_new(const char *topic,
if (partitions) {
topicdesc->partitions =
rd_calloc(sizeof(*partitions), partition_cnt);
for (i = 0; i < partition_cnt; i++) {
topicdesc->partitions[i].err = partitions[i].err;
topicdesc->partitions[i].id = partitions[i].id;
topicdesc->partitions[i].isr_cnt =
partitions[i].isr_cnt;
topicdesc->partitions[i].isrs = rd_calloc(
sizeof(int32_t), topicdesc->partitions[i].isr_cnt);
for (j = 0; j < topicdesc->partitions[i].isr_cnt; j++) {
topicdesc->partitions[i].isrs[j] =
partitions[i].isrs[j];
}
topicdesc->partitions[i].leader = partitions[i].leader;
topicdesc->partitions[i].replica_cnt =
partitions[i].replica_cnt;
topicdesc->partitions[i].replicas =
rd_calloc(sizeof(int32_t),
topicdesc->partitions[i].replica_cnt);
for (j = 0; j < topicdesc->partitions[i].replica_cnt;
j++) {
topicdesc->partitions[i].replicas[j] =
partitions[i].replicas[j];
}
}
for (i = 0; i < partition_cnt; i++)
rd_kafka_copy_metadata_partition(
&partitions[i], &topicdesc->partitions[i]);
}
return topicdesc;
}
Expand Down

0 comments on commit 0bc97b3

Please sign in to comment.