From 6ef28b0fc6e527c0f01c50b36f9cec269b3c9734 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Thu, 28 Sep 2023 21:32:07 +0530 Subject: [PATCH 01/10] Added topic id to describe_topic response. Co-authored-by: mahajanadhitya <115617755+mahajanadhitya@users.noreply.github.com> --- examples/describe_topics.c | 22 +++++++++++------ src/rdkafka.c | 35 ++++++++++++++++++++++++++ src/rdkafka.h | 50 ++++++++++++++++++++++++++++++++++++++ src/rdkafka_admin.c | 21 ++++++++++++---- src/rdkafka_admin.h | 7 +++--- src/rdkafka_proto.h | 47 ++++++++++++++--------------------- 6 files changed, 137 insertions(+), 45 deletions(-) diff --git a/examples/describe_topics.c b/examples/describe_topics.c index 7008693d82..78e9cd05de 100644 --- a/examples/describe_topics.c +++ b/examples/describe_topics.c @@ -198,16 +198,21 @@ print_partition_info(const rd_kafka_TopicPartitionInfo_t *partition) { */ static void print_topic_info(const rd_kafka_TopicDescription_t *topic) { size_t j; - const rd_kafka_error_t *error; - const char *topic_name = rd_kafka_TopicDescription_name(topic); - error = rd_kafka_TopicDescription_error(topic); + const char *topic_name = rd_kafka_TopicDescription_name(topic); + const rd_kafka_error_t *error = rd_kafka_TopicDescription_error(topic); const rd_kafka_AclOperation_t *authorized_operations; size_t authorized_operations_cnt; const rd_kafka_TopicPartitionInfo_t **partitions; size_t partition_cnt; + rd_kafka_uuid_t *topic_id = rd_kafka_TopicDescription_topic_id(topic); + char *topic_id_str = rd_kafka_uuid_base64str(topic_id); + int64_t topic_id_msb = rd_kafka_uuid_most_significant_bits(topic_id); + int64_t topic_id_lsb = rd_kafka_uuid_least_significant_bits(topic_id); if (rd_kafka_error_code(error)) { - printf("Topic: %s has error[%" PRId32 "]: %s\n", topic_name, + printf("Topic: %s [LSB : %" PRId64 "MSB : %" PRId64 + " Base64String : %s] has error[%" PRId32 "]: %s\n", + topic_name, topic_id_lsb, topic_id_msb, topic_id_str, rd_kafka_error_code(error), rd_kafka_error_string(error)); return; @@ -216,10 +221,11 @@ static void print_topic_info(const rd_kafka_TopicDescription_t *topic) { authorized_operations = rd_kafka_TopicDescription_authorized_operations( topic, &authorized_operations_cnt); - printf( - "Topic: %s succeeded, has %d topic authorized operations " - "allowed, they are:\n", - topic_name, (int)authorized_operations_cnt); + printf("Topic: %s [LSB : %" PRId64 " MSB : %" PRId64 + " Base64String : %s] succeeded, has %ld authorized operations " + "allowed, they are:\n", + topic_name, topic_id_lsb, topic_id_msb, topic_id_str, + authorized_operations_cnt); for (j = 0; j < authorized_operations_cnt; j++) printf("\t%s operation is allowed\n", diff --git a/src/rdkafka.c b/src/rdkafka.c index 6401e3520e..66d548329a 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -5027,3 +5027,38 @@ int rd_kafka_errno(void) { int rd_kafka_unittest(void) { return rd_unittest(); } + +char *rd_kafka_uuid_base64str(rd_kafka_uuid_t *uuid) { + if (*uuid->base64str) + return uuid->base64str; + + rd_chariov_t in_base64; + char *out_base64_str; + char *uuid_bytes; + uint64_t input_uuid[2]; + + input_uuid[0] = htobe64(uuid->most_significant_bits); + input_uuid[1] = htobe64(uuid->least_significant_bits); + uuid_bytes = (char *)input_uuid; + in_base64.ptr = uuid_bytes; + in_base64.size = sizeof(uuid->most_significant_bits) + + sizeof(uuid->least_significant_bits); + + out_base64_str = rd_base64_encode_str(&in_base64); + if (!out_base64_str) + return NULL; + + rd_strlcpy(uuid->base64str, out_base64_str, + 23 /* Removing extra ('=') padding */); + rd_free(out_base64_str); + return uuid->base64str; +} + +int64_t rd_kafka_uuid_least_significant_bits(rd_kafka_uuid_t *uuid) { + return uuid->least_significant_bits; +} + + +int64_t rd_kafka_uuid_most_significant_bits(rd_kafka_uuid_t *uuid) { + return uuid->most_significant_bits; +} \ No newline at end of file diff --git a/src/rdkafka.h b/src/rdkafka.h index d66f242307..dc624a7206 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -262,6 +262,7 @@ typedef struct rd_kafka_error_s rd_kafka_error_t; typedef struct rd_kafka_headers_s rd_kafka_headers_t; typedef struct rd_kafka_group_result_s rd_kafka_group_result_t; typedef struct rd_kafka_acl_result_s rd_kafka_acl_result_t; +typedef struct rd_kafka_uuid_s rd_kafka_uuid_t; /* @endcond */ @@ -1631,6 +1632,43 @@ rd_kafka_message_leader_epoch(const rd_kafka_message_t *rkmessage); /**@}*/ +/** + * @name UUID + * @{ + * + */ + +/** + * @brief Computes base64 encoding for the given uuid string. + * @param uuid UUID for which base64 encoding is required. + * + * @return base64 encoded string for the given UUID or NULL in case of some + * issue with the conversion or the conversion is not supported. + */ +RD_EXPORT char *rd_kafka_uuid_base64str(rd_kafka_uuid_t *uuid); + +/** + * @brief Gets least significant 64 bits for the given UUID. + * + * @param uuid UUID + * + * @return least significant 64 bits for the given UUID. + */ +RD_EXPORT int64_t rd_kafka_uuid_least_significant_bits(rd_kafka_uuid_t *uuid); + + +/** + * @brief Gets most significant 64 bits for the given UUID. + * + * @param uuid UUID + * + * @return most significant 64 bits for the given UUID. + */ +RD_EXPORT int64_t rd_kafka_uuid_most_significant_bits(rd_kafka_uuid_t *uuid); + +/**@}*/ + + /** * @name Configuration interface * @{ @@ -8172,6 +8210,18 @@ RD_EXPORT const char * rd_kafka_TopicDescription_name(const rd_kafka_TopicDescription_t *topicdesc); +/** + * @brief Gets the topic id for the \p topicdesc topic. + * + * @param topicdesc The topic description. + * @return The topic id + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p topicdesc object. + */ +RD_EXPORT rd_kafka_uuid_t *rd_kafka_TopicDescription_topic_id( + const rd_kafka_TopicDescription_t *topicdesc); + /** * @brief Gets if the \p topicdesc topic is internal. * diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 3107332a7f..a8f6a636bc 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -8010,6 +8010,7 @@ rd_kafka_TopicPartitionInfo_destroy(rd_kafka_TopicPartitionInfo_t *pinfo) { * @brief Create a new TopicDescription object. * * @param topic topic name + * @param topic_id topic id * @param partitions Array of partition metadata (rd_kafka_metadata_partition). * @param partition_cnt Number of partitions in partition metadata. * @param authorized_operations acl operations allowed for topic. @@ -8019,6 +8020,7 @@ rd_kafka_TopicPartitionInfo_destroy(rd_kafka_TopicPartitionInfo_t *pinfo) { */ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( const char *topic, + const rd_kafka_uuid_t *topic_id, const struct rd_kafka_metadata_partition *partitions, int partition_cnt, const struct rd_kafka_metadata_broker *brokers_sorted, @@ -8032,6 +8034,7 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( int i; topicdesc = rd_calloc(1, sizeof(*topicdesc)); topicdesc->topic = rd_strdup(topic); + topicdesc->topic_id = rd_kafka_uuid_copy(topic_id); topicdesc->partition_cnt = partition_cnt; topicdesc->is_internal = is_internal; if (error) @@ -8063,9 +8066,10 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( */ static rd_kafka_TopicDescription_t * rd_kafka_TopicDescription_new_error(const char *topic, + const rd_kafka_uuid_t *topic_id, rd_kafka_error_t *error) { - return rd_kafka_TopicDescription_new(topic, NULL, 0, NULL, NULL, 0, - NULL, 0, rd_false, error); + return rd_kafka_TopicDescription_new(topic, topic_id, NULL, 0, NULL, + NULL, 0, NULL, 0, rd_false, error); } static void @@ -8075,7 +8079,7 @@ rd_kafka_TopicDescription_destroy(rd_kafka_TopicDescription_t *topicdesc) { RD_IF_FREE(topicdesc->topic, rd_free); RD_IF_FREE(topicdesc->error, rd_kafka_error_destroy); RD_IF_FREE(topicdesc->authorized_operations, rd_free); - + RD_IF_FREE(topicdesc->topic_id, rd_kafka_uuid_destroy); for (i = 0; i < topicdesc->partition_cnt; i++) rd_kafka_TopicPartitionInfo_destroy(topicdesc->partitions[i]); rd_free(topicdesc->partitions); @@ -8142,6 +8146,11 @@ rd_kafka_TopicDescription_error(const rd_kafka_TopicDescription_t *topicdesc) { return topicdesc->error; } +rd_kafka_uuid_t *rd_kafka_TopicDescription_topic_id( + const rd_kafka_TopicDescription_t *topicdesc) { + return topicdesc->topic_id; +} + const rd_kafka_TopicDescription_t **rd_kafka_DescribeTopics_result_topics( const rd_kafka_DescribeTopics_result_t *result, size_t *cntp) { @@ -8240,7 +8249,8 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req, mdi->topics[i].topic_authorized_operations, &authorized_operation_cnt); topicdesc = rd_kafka_TopicDescription_new( - md->topics[i].topic, md->topics[i].partitions, + md->topics[i].topic, &mdi->topics[i].topic_id, + md->topics[i].partitions, md->topics[i].partition_cnt, mdi->brokers_sorted, mdi->brokers, md->broker_cnt, authorized_operations, authorized_operation_cnt, @@ -8251,7 +8261,8 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req, md->topics[i].err, "%s", rd_kafka_err2str(md->topics[i].err)); topicdesc = rd_kafka_TopicDescription_new_error( - md->topics[i].topic, error); + md->topics[i].topic, &mdi->topics[i].topic_id, + error); rd_kafka_error_destroy(error); } orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index 4eb015fad0..92ed259be9 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -525,9 +525,10 @@ struct rd_kafka_TopicPartitionInfo_s { * @struct DescribeTopics result */ struct rd_kafka_TopicDescription_s { - char *topic; /**< Topic name */ - int partition_cnt; /**< Number of partitions in \p partitions*/ - rd_bool_t is_internal; /**< Is the topic is internal to Kafka? */ + char *topic; /**< Topic name */ + rd_kafka_uuid_t *topic_id; /**< Topic Id */ + int partition_cnt; /**< Number of partitions in \p partitions*/ + rd_bool_t is_internal; /**< Is the topic is internal to Kafka? */ rd_kafka_TopicPartitionInfo_t **partitions; /**< Partitions */ rd_kafka_error_t *error; /**< Topic error reported by broker */ int authorized_operations_cnt; /**< Count of operations allowed for diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index 6ee948f2cb..561d4e872f 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -596,7 +596,7 @@ typedef struct rd_kafka_uuid_s { * @return A newly allocated UUID. */ static RD_INLINE RD_UNUSED rd_kafka_uuid_t *rd_kafka_uuid_new() { - rd_kafka_uuid_t *uuid = rd_calloc(1, sizeof(rd_kafka_uuid_t *)); + rd_kafka_uuid_t *uuid = rd_calloc(1, sizeof(rd_kafka_uuid_t)); return uuid; } @@ -610,39 +610,28 @@ static RD_INLINE RD_UNUSED void rd_kafka_uuid_init(rd_kafka_uuid_t *uuid) { } /** - * @brief Computes base64 encoding for the given uuid string. - * @param uuid UUID for which base64 encoding is required. + * Returns a newly allocated copy of the given UUID. * - * @return base64 encoded string for the given UUID or NULL in case of some - * issue with the conversion or the conversion is not supported. + * @param uuid UUID to copy. + * @return Copy of the provided UUID. + * + * @remark Dynamically allocated. Deallocate (free) after use. */ -static RD_INLINE RD_UNUSED char * -rd_kafka_uuid_base64str(rd_kafka_uuid_t *uuid) { +static RD_INLINE RD_UNUSED rd_kafka_uuid_t * +rd_kafka_uuid_copy(rd_kafka_uuid_t *uuid) { + rd_kafka_uuid_t *copy_uuid = rd_kafka_uuid_new(); + copy_uuid->most_significant_bits = uuid->most_significant_bits; + copy_uuid->least_significant_bits = uuid->least_significant_bits; if (*uuid->base64str) - return uuid->base64str; - - rd_chariov_t in_base64; - char *out_base64_str; - char *uuid_bytes; - uint64_t input_uuid[2]; - - input_uuid[0] = htobe64(uuid->most_significant_bits); - input_uuid[1] = htobe64(uuid->least_significant_bits); - uuid_bytes = (char *)input_uuid; - in_base64.ptr = uuid_bytes; - in_base64.size = sizeof(uuid->most_significant_bits) + - sizeof(uuid->least_significant_bits); - - out_base64_str = rd_base64_encode_str(&in_base64); - if (!out_base64_str) - return NULL; - - rd_strlcpy(uuid->base64str, out_base64_str, - 23 /* Removing extra ('=') padding */); - rd_free(out_base64_str); - return uuid->base64str; + memcpy(copy_uuid->base64str, uuid->base64str, 23); + return copy_uuid; } +/** + * @brief Destroy the provided uuid. + * + * @param uuid UUID + */ static RD_INLINE RD_UNUSED void rd_kafka_uuid_destroy(rd_kafka_uuid_t *uuid) { rd_free(uuid); } From d1e078dadb094df25f32d45ebef72709377dfcfc Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Fri, 29 Sep 2023 03:00:04 +0530 Subject: [PATCH 02/10] Some refactors --- examples/describe_topics.c | 17 ++++++----------- src/rdkafka_admin.c | 4 ++-- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/examples/describe_topics.c b/examples/describe_topics.c index 78e9cd05de..d74e3c78f6 100644 --- a/examples/describe_topics.c +++ b/examples/describe_topics.c @@ -206,14 +206,10 @@ static void print_topic_info(const rd_kafka_TopicDescription_t *topic) { size_t partition_cnt; rd_kafka_uuid_t *topic_id = rd_kafka_TopicDescription_topic_id(topic); char *topic_id_str = rd_kafka_uuid_base64str(topic_id); - int64_t topic_id_msb = rd_kafka_uuid_most_significant_bits(topic_id); - int64_t topic_id_lsb = rd_kafka_uuid_least_significant_bits(topic_id); if (rd_kafka_error_code(error)) { - printf("Topic: %s [LSB : %" PRId64 "MSB : %" PRId64 - " Base64String : %s] has error[%" PRId32 "]: %s\n", - topic_name, topic_id_lsb, topic_id_msb, topic_id_str, - rd_kafka_error_code(error), + printf("Topic: %s (Topic Id: %s) has error[%" PRId32 "]: %s\n", + topic_name, topic_id_str, rd_kafka_error_code(error), rd_kafka_error_string(error)); return; } @@ -221,11 +217,10 @@ static void print_topic_info(const rd_kafka_TopicDescription_t *topic) { authorized_operations = rd_kafka_TopicDescription_authorized_operations( topic, &authorized_operations_cnt); - printf("Topic: %s [LSB : %" PRId64 " MSB : %" PRId64 - " Base64String : %s] succeeded, has %ld authorized operations " - "allowed, they are:\n", - topic_name, topic_id_lsb, topic_id_msb, topic_id_str, - authorized_operations_cnt); + printf( + "Topic: %s (Topic Id: %s) succeeded, has %ld authorized operations " + "allowed, they are:\n", + topic_name, topic_id_str, authorized_operations_cnt); for (j = 0; j < authorized_operations_cnt; j++) printf("\t%s operation is allowed\n", diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index a8f6a636bc..061a1c88b4 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -8020,7 +8020,7 @@ rd_kafka_TopicPartitionInfo_destroy(rd_kafka_TopicPartitionInfo_t *pinfo) { */ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( const char *topic, - const rd_kafka_uuid_t *topic_id, + rd_kafka_uuid_t *topic_id, const struct rd_kafka_metadata_partition *partitions, int partition_cnt, const struct rd_kafka_metadata_broker *brokers_sorted, @@ -8066,7 +8066,7 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( */ static rd_kafka_TopicDescription_t * rd_kafka_TopicDescription_new_error(const char *topic, - const rd_kafka_uuid_t *topic_id, + rd_kafka_uuid_t *topic_id, rd_kafka_error_t *error) { return rd_kafka_TopicDescription_new(topic, topic_id, NULL, 0, NULL, NULL, 0, NULL, 0, rd_false, error); From e38bd3cbf29b280e80c7f66d68ab1419311ddfc0 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 4 Oct 2023 05:15:29 +0530 Subject: [PATCH 03/10] Added public initializer for Uuid. Exposed some more functions related to Uuid --- src/rdkafka.c | 39 +++++++++++++++++++++++++++++++++++++ src/rdkafka.h | 29 ++++++++++++++++++++++++++++ src/rdkafka_proto.h | 47 --------------------------------------------- 3 files changed, 68 insertions(+), 47 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index 66d548329a..732408bc87 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -5028,6 +5028,45 @@ int rd_kafka_unittest(void) { return rd_unittest(); } + +/** + * Creates a new UUID. + * + * @return A newly allocated UUID. + */ +rd_kafka_uuid_t *rd_kafka_uuid_new(int64_t most_significant_bits, int64_t least_significant_bits) { + rd_kafka_uuid_t *uuid = rd_calloc(1, sizeof(rd_kafka_uuid_t)); + uuid->most_significant_bits = most_significant_bits; + uuid->least_significant_bits = least_significant_bits; + return uuid; +} + +/** + * Returns a newly allocated copy of the given UUID. + * + * @param uuid UUID to copy. + * @return Copy of the provided UUID. + * + * @remark Dynamically allocated. Deallocate (free) after use. + */ +rd_kafka_uuid_t * +rd_kafka_uuid_copy(rd_kafka_uuid_t *uuid) { + rd_kafka_uuid_t *copy_uuid = rd_kafka_uuid_new(uuid->most_significant_bits, + uuid->least_significant_bits); + if (*uuid->base64str) + memcpy(copy_uuid->base64str, uuid->base64str, 23); + return copy_uuid; +} + +/** + * @brief Destroy the provided uuid. + * + * @param uuid UUID + */ +void rd_kafka_uuid_destroy(rd_kafka_uuid_t *uuid) { + rd_free(uuid); +} + char *rd_kafka_uuid_base64str(rd_kafka_uuid_t *uuid) { if (*uuid->base64str) return uuid->base64str; diff --git a/src/rdkafka.h b/src/rdkafka.h index dc624a7206..cb5ae0f3bf 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -1666,6 +1666,35 @@ RD_EXPORT int64_t rd_kafka_uuid_least_significant_bits(rd_kafka_uuid_t *uuid); */ RD_EXPORT int64_t rd_kafka_uuid_most_significant_bits(rd_kafka_uuid_t *uuid); + +/** + * @brief Creates a new UUID. + * + * @param most_significant_bits most significant 64 bits of the 128 bits UUID. + * @param least_significant_bits least significant 64 bits of the 128 bits UUID. + * + * @return A newly allocated UUID. + * @remark Must be freed after use using rd_kafka_uuid_destroy() + */ +RD_EXPORT rd_kafka_uuid_t *rd_kafka_uuid_new(int64_t most_significant_bits, int64_t least_significant_bits); + +/** + * @brief Copies the given UUID. + * + * @param uuid UUID to be copied. + * + * @return A newly allocated copy of the provided UUID. + * @remark Must be freed after use using rd_kafka_uuid_destroy() + */ +RD_EXPORT rd_kafka_uuid_t *rd_kafka_uuid_copy(rd_kafka_uuid_t *uuid); + +/** + * @brief Destroy the provided uuid. + * + * @param uuid UUID + */ +RD_EXPORT void rd_kafka_uuid_destroy(rd_kafka_uuid_t *uuid); + /**@}*/ diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index 561d4e872f..645a8eafa2 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -590,53 +590,6 @@ typedef struct rd_kafka_uuid_s { { 0, 1, "" } -/** - * Creates a new UUID. - * - * @return A newly allocated UUID. - */ -static RD_INLINE RD_UNUSED rd_kafka_uuid_t *rd_kafka_uuid_new() { - rd_kafka_uuid_t *uuid = rd_calloc(1, sizeof(rd_kafka_uuid_t)); - return uuid; -} - -/** - * Initialize given UUID to zero UUID. - * - * @param uuid UUID to initialize. - */ -static RD_INLINE RD_UNUSED void rd_kafka_uuid_init(rd_kafka_uuid_t *uuid) { - memset(uuid, 0, sizeof(*uuid)); -} - -/** - * Returns a newly allocated copy of the given UUID. - * - * @param uuid UUID to copy. - * @return Copy of the provided UUID. - * - * @remark Dynamically allocated. Deallocate (free) after use. - */ -static RD_INLINE RD_UNUSED rd_kafka_uuid_t * -rd_kafka_uuid_copy(rd_kafka_uuid_t *uuid) { - rd_kafka_uuid_t *copy_uuid = rd_kafka_uuid_new(); - copy_uuid->most_significant_bits = uuid->most_significant_bits; - copy_uuid->least_significant_bits = uuid->least_significant_bits; - if (*uuid->base64str) - memcpy(copy_uuid->base64str, uuid->base64str, 23); - return copy_uuid; -} - -/** - * @brief Destroy the provided uuid. - * - * @param uuid UUID - */ -static RD_INLINE RD_UNUSED void rd_kafka_uuid_destroy(rd_kafka_uuid_t *uuid) { - rd_free(uuid); -} - - /** * @name Producer ID and Epoch for the Idempotent Producer * @{ From 1442063aacceceb1c4f10554674822834df2d42a Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 4 Oct 2023 05:16:38 +0530 Subject: [PATCH 04/10] Style fixes --- src/rdkafka.c | 14 +++++++------- src/rdkafka.h | 3 ++- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index 732408bc87..7414b8f2c8 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -5034,9 +5034,10 @@ int rd_kafka_unittest(void) { * * @return A newly allocated UUID. */ -rd_kafka_uuid_t *rd_kafka_uuid_new(int64_t most_significant_bits, int64_t least_significant_bits) { - rd_kafka_uuid_t *uuid = rd_calloc(1, sizeof(rd_kafka_uuid_t)); - uuid->most_significant_bits = most_significant_bits; +rd_kafka_uuid_t *rd_kafka_uuid_new(int64_t most_significant_bits, + int64_t least_significant_bits) { + rd_kafka_uuid_t *uuid = rd_calloc(1, sizeof(rd_kafka_uuid_t)); + uuid->most_significant_bits = most_significant_bits; uuid->least_significant_bits = least_significant_bits; return uuid; } @@ -5049,10 +5050,9 @@ rd_kafka_uuid_t *rd_kafka_uuid_new(int64_t most_significant_bits, int64_t least_ * * @remark Dynamically allocated. Deallocate (free) after use. */ -rd_kafka_uuid_t * -rd_kafka_uuid_copy(rd_kafka_uuid_t *uuid) { - rd_kafka_uuid_t *copy_uuid = rd_kafka_uuid_new(uuid->most_significant_bits, - uuid->least_significant_bits); +rd_kafka_uuid_t *rd_kafka_uuid_copy(rd_kafka_uuid_t *uuid) { + rd_kafka_uuid_t *copy_uuid = rd_kafka_uuid_new( + uuid->most_significant_bits, uuid->least_significant_bits); if (*uuid->base64str) memcpy(copy_uuid->base64str, uuid->base64str, 23); return copy_uuid; diff --git a/src/rdkafka.h b/src/rdkafka.h index cb5ae0f3bf..55df3261d5 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -1676,7 +1676,8 @@ RD_EXPORT int64_t rd_kafka_uuid_most_significant_bits(rd_kafka_uuid_t *uuid); * @return A newly allocated UUID. * @remark Must be freed after use using rd_kafka_uuid_destroy() */ -RD_EXPORT rd_kafka_uuid_t *rd_kafka_uuid_new(int64_t most_significant_bits, int64_t least_significant_bits); +RD_EXPORT rd_kafka_uuid_t *rd_kafka_uuid_new(int64_t most_significant_bits, + int64_t least_significant_bits); /** * @brief Copies the given UUID. From 902a517f0fe6bbf626e63c38df9ad1e88118f980 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Tue, 10 Oct 2023 00:40:44 +0530 Subject: [PATCH 05/10] Changed uuid to Uuid --- examples/describe_topics.c | 4 ++-- src/rdkafka.c | 16 ++++++++-------- src/rdkafka.h | 20 ++++++++++---------- src/rdkafka_admin.c | 10 +++++----- src/rdkafka_admin.h | 2 +- src/rdkafka_buf.h | 2 +- src/rdkafka_metadata.h | 2 +- src/rdkafka_proto.h | 6 +++--- src/rdkafka_request.c | 2 +- 9 files changed, 32 insertions(+), 32 deletions(-) diff --git a/examples/describe_topics.c b/examples/describe_topics.c index d74e3c78f6..7e47f9b4a2 100644 --- a/examples/describe_topics.c +++ b/examples/describe_topics.c @@ -204,8 +204,8 @@ static void print_topic_info(const rd_kafka_TopicDescription_t *topic) { size_t authorized_operations_cnt; const rd_kafka_TopicPartitionInfo_t **partitions; size_t partition_cnt; - rd_kafka_uuid_t *topic_id = rd_kafka_TopicDescription_topic_id(topic); - char *topic_id_str = rd_kafka_uuid_base64str(topic_id); + rd_kafka_Uuid_t *topic_id = rd_kafka_TopicDescription_topic_id(topic); + char *topic_id_str = rd_kafka_Uuid_base64str(topic_id); if (rd_kafka_error_code(error)) { printf("Topic: %s (Topic Id: %s) has error[%" PRId32 "]: %s\n", diff --git a/src/rdkafka.c b/src/rdkafka.c index 7414b8f2c8..99684bfb15 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -5034,9 +5034,9 @@ int rd_kafka_unittest(void) { * * @return A newly allocated UUID. */ -rd_kafka_uuid_t *rd_kafka_uuid_new(int64_t most_significant_bits, +rd_kafka_Uuid_t *rd_kafka_Uuid_new(int64_t most_significant_bits, int64_t least_significant_bits) { - rd_kafka_uuid_t *uuid = rd_calloc(1, sizeof(rd_kafka_uuid_t)); + rd_kafka_Uuid_t *uuid = rd_calloc(1, sizeof(rd_kafka_Uuid_t)); uuid->most_significant_bits = most_significant_bits; uuid->least_significant_bits = least_significant_bits; return uuid; @@ -5050,8 +5050,8 @@ rd_kafka_uuid_t *rd_kafka_uuid_new(int64_t most_significant_bits, * * @remark Dynamically allocated. Deallocate (free) after use. */ -rd_kafka_uuid_t *rd_kafka_uuid_copy(rd_kafka_uuid_t *uuid) { - rd_kafka_uuid_t *copy_uuid = rd_kafka_uuid_new( +rd_kafka_Uuid_t *rd_kafka_Uuid_copy(rd_kafka_Uuid_t *uuid) { + rd_kafka_Uuid_t *copy_uuid = rd_kafka_Uuid_new( uuid->most_significant_bits, uuid->least_significant_bits); if (*uuid->base64str) memcpy(copy_uuid->base64str, uuid->base64str, 23); @@ -5063,11 +5063,11 @@ rd_kafka_uuid_t *rd_kafka_uuid_copy(rd_kafka_uuid_t *uuid) { * * @param uuid UUID */ -void rd_kafka_uuid_destroy(rd_kafka_uuid_t *uuid) { +void rd_kafka_Uuid_destroy(rd_kafka_Uuid_t *uuid) { rd_free(uuid); } -char *rd_kafka_uuid_base64str(rd_kafka_uuid_t *uuid) { +char *rd_kafka_Uuid_base64str(rd_kafka_Uuid_t *uuid) { if (*uuid->base64str) return uuid->base64str; @@ -5093,11 +5093,11 @@ char *rd_kafka_uuid_base64str(rd_kafka_uuid_t *uuid) { return uuid->base64str; } -int64_t rd_kafka_uuid_least_significant_bits(rd_kafka_uuid_t *uuid) { +int64_t rd_kafka_Uuid_least_significant_bits(rd_kafka_Uuid_t *uuid) { return uuid->least_significant_bits; } -int64_t rd_kafka_uuid_most_significant_bits(rd_kafka_uuid_t *uuid) { +int64_t rd_kafka_Uuid_most_significant_bits(rd_kafka_Uuid_t *uuid) { return uuid->most_significant_bits; } \ No newline at end of file diff --git a/src/rdkafka.h b/src/rdkafka.h index 55df3261d5..f94c220cdb 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -262,7 +262,7 @@ typedef struct rd_kafka_error_s rd_kafka_error_t; typedef struct rd_kafka_headers_s rd_kafka_headers_t; typedef struct rd_kafka_group_result_s rd_kafka_group_result_t; typedef struct rd_kafka_acl_result_s rd_kafka_acl_result_t; -typedef struct rd_kafka_uuid_s rd_kafka_uuid_t; +typedef struct rd_kafka_Uuid_s rd_kafka_Uuid_t; /* @endcond */ @@ -1645,7 +1645,7 @@ rd_kafka_message_leader_epoch(const rd_kafka_message_t *rkmessage); * @return base64 encoded string for the given UUID or NULL in case of some * issue with the conversion or the conversion is not supported. */ -RD_EXPORT char *rd_kafka_uuid_base64str(rd_kafka_uuid_t *uuid); +RD_EXPORT char *rd_kafka_Uuid_base64str(rd_kafka_Uuid_t *uuid); /** * @brief Gets least significant 64 bits for the given UUID. @@ -1654,7 +1654,7 @@ RD_EXPORT char *rd_kafka_uuid_base64str(rd_kafka_uuid_t *uuid); * * @return least significant 64 bits for the given UUID. */ -RD_EXPORT int64_t rd_kafka_uuid_least_significant_bits(rd_kafka_uuid_t *uuid); +RD_EXPORT int64_t rd_kafka_Uuid_least_significant_bits(rd_kafka_Uuid_t *uuid); /** @@ -1664,7 +1664,7 @@ RD_EXPORT int64_t rd_kafka_uuid_least_significant_bits(rd_kafka_uuid_t *uuid); * * @return most significant 64 bits for the given UUID. */ -RD_EXPORT int64_t rd_kafka_uuid_most_significant_bits(rd_kafka_uuid_t *uuid); +RD_EXPORT int64_t rd_kafka_Uuid_most_significant_bits(rd_kafka_Uuid_t *uuid); /** @@ -1674,9 +1674,9 @@ RD_EXPORT int64_t rd_kafka_uuid_most_significant_bits(rd_kafka_uuid_t *uuid); * @param least_significant_bits least significant 64 bits of the 128 bits UUID. * * @return A newly allocated UUID. - * @remark Must be freed after use using rd_kafka_uuid_destroy() + * @remark Must be freed after use using rd_kafka_Uuid_destroy() */ -RD_EXPORT rd_kafka_uuid_t *rd_kafka_uuid_new(int64_t most_significant_bits, +RD_EXPORT rd_kafka_Uuid_t *rd_kafka_Uuid_new(int64_t most_significant_bits, int64_t least_significant_bits); /** @@ -1685,16 +1685,16 @@ RD_EXPORT rd_kafka_uuid_t *rd_kafka_uuid_new(int64_t most_significant_bits, * @param uuid UUID to be copied. * * @return A newly allocated copy of the provided UUID. - * @remark Must be freed after use using rd_kafka_uuid_destroy() + * @remark Must be freed after use using rd_kafka_Uuid_destroy() */ -RD_EXPORT rd_kafka_uuid_t *rd_kafka_uuid_copy(rd_kafka_uuid_t *uuid); +RD_EXPORT rd_kafka_Uuid_t *rd_kafka_Uuid_copy(rd_kafka_Uuid_t *uuid); /** * @brief Destroy the provided uuid. * * @param uuid UUID */ -RD_EXPORT void rd_kafka_uuid_destroy(rd_kafka_uuid_t *uuid); +RD_EXPORT void rd_kafka_Uuid_destroy(rd_kafka_Uuid_t *uuid); /**@}*/ @@ -8249,7 +8249,7 @@ rd_kafka_TopicDescription_name(const rd_kafka_TopicDescription_t *topicdesc); * @remark The lifetime of the returned memory is the same * as the lifetime of the \p topicdesc object. */ -RD_EXPORT rd_kafka_uuid_t *rd_kafka_TopicDescription_topic_id( +RD_EXPORT rd_kafka_Uuid_t *rd_kafka_TopicDescription_topic_id( const rd_kafka_TopicDescription_t *topicdesc); /** diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 061a1c88b4..4b9a8becc0 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -8020,7 +8020,7 @@ rd_kafka_TopicPartitionInfo_destroy(rd_kafka_TopicPartitionInfo_t *pinfo) { */ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( const char *topic, - rd_kafka_uuid_t *topic_id, + rd_kafka_Uuid_t *topic_id, const struct rd_kafka_metadata_partition *partitions, int partition_cnt, const struct rd_kafka_metadata_broker *brokers_sorted, @@ -8034,7 +8034,7 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( int i; topicdesc = rd_calloc(1, sizeof(*topicdesc)); topicdesc->topic = rd_strdup(topic); - topicdesc->topic_id = rd_kafka_uuid_copy(topic_id); + topicdesc->topic_id = rd_kafka_Uuid_copy(topic_id); topicdesc->partition_cnt = partition_cnt; topicdesc->is_internal = is_internal; if (error) @@ -8066,7 +8066,7 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( */ static rd_kafka_TopicDescription_t * rd_kafka_TopicDescription_new_error(const char *topic, - rd_kafka_uuid_t *topic_id, + rd_kafka_Uuid_t *topic_id, rd_kafka_error_t *error) { return rd_kafka_TopicDescription_new(topic, topic_id, NULL, 0, NULL, NULL, 0, NULL, 0, rd_false, error); @@ -8079,7 +8079,7 @@ rd_kafka_TopicDescription_destroy(rd_kafka_TopicDescription_t *topicdesc) { RD_IF_FREE(topicdesc->topic, rd_free); RD_IF_FREE(topicdesc->error, rd_kafka_error_destroy); RD_IF_FREE(topicdesc->authorized_operations, rd_free); - RD_IF_FREE(topicdesc->topic_id, rd_kafka_uuid_destroy); + RD_IF_FREE(topicdesc->topic_id, rd_kafka_Uuid_destroy); for (i = 0; i < topicdesc->partition_cnt; i++) rd_kafka_TopicPartitionInfo_destroy(topicdesc->partitions[i]); rd_free(topicdesc->partitions); @@ -8146,7 +8146,7 @@ rd_kafka_TopicDescription_error(const rd_kafka_TopicDescription_t *topicdesc) { return topicdesc->error; } -rd_kafka_uuid_t *rd_kafka_TopicDescription_topic_id( +rd_kafka_Uuid_t *rd_kafka_TopicDescription_topic_id( const rd_kafka_TopicDescription_t *topicdesc) { return topicdesc->topic_id; } diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index 92ed259be9..d634b9bb74 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -526,7 +526,7 @@ struct rd_kafka_TopicPartitionInfo_s { */ struct rd_kafka_TopicDescription_s { char *topic; /**< Topic name */ - rd_kafka_uuid_t *topic_id; /**< Topic Id */ + rd_kafka_Uuid_t *topic_id; /**< Topic Id */ int partition_cnt; /**< Number of partitions in \p partitions*/ rd_bool_t is_internal; /**< Is the topic is internal to Kafka? */ rd_kafka_TopicPartitionInfo_t **partitions; /**< Partitions */ diff --git a/src/rdkafka_buf.h b/src/rdkafka_buf.h index 623ec49ae0..b6568b0ca9 100644 --- a/src/rdkafka_buf.h +++ b/src/rdkafka_buf.h @@ -1454,7 +1454,7 @@ void rd_kafka_buf_set_maker(rd_kafka_buf_t *rkbuf, } while (0) static RD_UNUSED void rd_kafka_buf_write_uuid(rd_kafka_buf_t *rkbuf, - rd_kafka_uuid_t *uuid) { + rd_kafka_Uuid_t *uuid) { rd_kafka_buf_write_i64(rkbuf, uuid->most_significant_bits); rd_kafka_buf_write_i64(rkbuf, uuid->least_significant_bits); } diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 2b81e0ddec..ded83bb14c 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -54,7 +54,7 @@ typedef struct rd_kafka_metadata_topic_internal_s { * same count as metadata.topics[i].partition_cnt. * Sorted by Partition Id. */ rd_kafka_metadata_partition_internal_t *partitions; - rd_kafka_uuid_t topic_id; + rd_kafka_Uuid_t topic_id; int32_t topic_authorized_operations; /**< ACL operations allowed * for topic, -1 if not * supported by broker */ diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index 645a8eafa2..e6caf509e3 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -572,16 +572,16 @@ typedef struct rd_kafka_buf_s rd_kafka_buf_t; /** * @struct Struct representing UUID protocol primitive type. */ -typedef struct rd_kafka_uuid_s { +typedef struct rd_kafka_Uuid_s { int64_t most_significant_bits; /**< Most significant 64 bits for the UUID */ int64_t least_significant_bits; /**< Least significant 64 bits for the UUID */ char base64str[23]; /**< base64 encoding for the uuid. By default, it is lazy loaded. Use function - `rd_kafka_uuid_base64str()` as a getter for this + `rd_kafka_Uuid_base64str()` as a getter for this field. */ -} rd_kafka_uuid_t; +} rd_kafka_Uuid_t; #define RD_KAFKA_UUID_ZERO \ { 0, 0, "" } diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 46c9ed4cc3..ca99349e46 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2321,7 +2321,7 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb, if (topic_cnt > 0) { char *topic; int i; - rd_kafka_uuid_t zero_uuid = RD_KAFKA_UUID_ZERO; + rd_kafka_Uuid_t zero_uuid = RD_KAFKA_UUID_ZERO; /* Maintain a copy of the topics list so we can purge * hints from the metadata cache on error. */ From 45e3798e0edde134e62a5f7bdb65007653ebbc4b Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Tue, 10 Oct 2023 01:42:27 +0530 Subject: [PATCH 06/10] Added test for topic id --- tests/0081-admin.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/0081-admin.c b/tests/0081-admin.c index 3107c048b0..32d89cd14c 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -3169,6 +3169,7 @@ static void do_test_DescribeTopics(const char *what, const rd_kafka_DescribeTopics_result_t *res; const rd_kafka_TopicDescription_t **result_topics; const rd_kafka_TopicPartitionInfo_t **partitions; + rd_kafka_Uuid_t *topic_id; size_t partitions_cnt; size_t result_topics_cnt; char errstr[128]; @@ -3261,6 +3262,10 @@ static void do_test_DescribeTopics(const char *what, "Expected topic name %s, got %s", topic_names[0], rd_kafka_TopicDescription_name(result_topics[0])); + topic_id = rd_kafka_TopicDescription_topic_id(result_topics[0]); + + TEST_ASSERT(topic_id, "Expected Topic Id to present."); + partitions = rd_kafka_TopicDescription_partitions(result_topics[0], &partitions_cnt); From 269f8d7de29404ad0eecbe187dd05739d0149ec5 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Tue, 10 Oct 2023 16:11:11 +0530 Subject: [PATCH 07/10] Added base64 encoding function when OpenSSL is not available. --- src/rdbase64.c | 49 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/src/rdbase64.c b/src/rdbase64.c index d81858418f..4a342ce581 100644 --- a/src/rdbase64.c +++ b/src/rdbase64.c @@ -30,6 +30,46 @@ #if WITH_SSL #include +#else + +# define conv_bin2ascii(a, table) ((table)[(a)&0x3f]) + +static const unsigned char data_bin2ascii[65] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + +static int base64_encoding_conversion(unsigned char *out, + const unsigned char *in, + int dlen) +{ + int i, ret = 0; + unsigned long l; + + for (i = dlen; i > 0; i -= 3) { + if (i >= 3) { + l = (((unsigned long)in[0]) << 16L) | + (((unsigned long)in[1]) << 8L) | in[2]; + *(out++) = conv_bin2ascii(l >> 18L, data_bin2ascii); + *(out++) = conv_bin2ascii(l >> 12L, data_bin2ascii); + *(out++) = conv_bin2ascii(l >> 6L, data_bin2ascii); + *(out++) = conv_bin2ascii(l, data_bin2ascii); + } else { + l = ((unsigned long)in[0]) << 16L; + if (i == 2) + l |= ((unsigned long)in[1] << 8L); + + *(out++) = conv_bin2ascii(l >> 18L, data_bin2ascii); + *(out++) = conv_bin2ascii(l >> 12L, data_bin2ascii); + *(out++) = (i == 1) ? '=' : conv_bin2ascii(l >> 6L, data_bin2ascii); + *(out++) = '='; + } + ret += 4; + in += 3; + } + + *out = '\0'; + return ret; +} + #endif /** @@ -41,7 +81,6 @@ */ void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) { -#if WITH_SSL size_t max_len; /* OpenSSL takes an |int| argument so the input cannot exceed that. */ @@ -53,14 +92,16 @@ void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) { max_len = (((in->size + 2) / 3) * 4) + 1; out->ptr = rd_malloc(max_len); +#if WITH_SSL out->size = EVP_EncodeBlock((unsigned char *)out->ptr, (unsigned char *)in->ptr, (int)in->size); +#else + out->size = base64_encoding_conversion((unsigned char *)out->ptr, + (unsigned char *)in->ptr, (int)in->size); +#endif rd_assert(out->size < max_len); out->ptr[out->size] = 0; -#else - out->ptr = NULL; -#endif } From cd0009c5b481dce79e05c4aba6801da7b1039f83 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 11 Oct 2023 11:36:10 +0530 Subject: [PATCH 08/10] PR comments --- examples/describe_topics.c | 4 ++-- src/rdkafka.c | 10 +++++----- src/rdkafka.h | 10 +++++----- src/rdkafka_admin.c | 15 +++++++-------- src/rdkafka_admin.h | 2 +- tests/0081-admin.c | 2 +- 6 files changed, 21 insertions(+), 22 deletions(-) diff --git a/examples/describe_topics.c b/examples/describe_topics.c index 7e47f9b4a2..83ad38a8cd 100644 --- a/examples/describe_topics.c +++ b/examples/describe_topics.c @@ -204,8 +204,8 @@ static void print_topic_info(const rd_kafka_TopicDescription_t *topic) { size_t authorized_operations_cnt; const rd_kafka_TopicPartitionInfo_t **partitions; size_t partition_cnt; - rd_kafka_Uuid_t *topic_id = rd_kafka_TopicDescription_topic_id(topic); - char *topic_id_str = rd_kafka_Uuid_base64str(topic_id); + const rd_kafka_Uuid_t *topic_id = rd_kafka_TopicDescription_topic_id(topic); + const char *topic_id_str = rd_kafka_Uuid_base64str(topic_id); if (rd_kafka_error_code(error)) { printf("Topic: %s (Topic Id: %s) has error[%" PRId32 "]: %s\n", diff --git a/src/rdkafka.c b/src/rdkafka.c index 99684bfb15..fd5ea3a61a 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -5050,7 +5050,7 @@ rd_kafka_Uuid_t *rd_kafka_Uuid_new(int64_t most_significant_bits, * * @remark Dynamically allocated. Deallocate (free) after use. */ -rd_kafka_Uuid_t *rd_kafka_Uuid_copy(rd_kafka_Uuid_t *uuid) { +rd_kafka_Uuid_t *rd_kafka_Uuid_copy(const rd_kafka_Uuid_t *uuid) { rd_kafka_Uuid_t *copy_uuid = rd_kafka_Uuid_new( uuid->most_significant_bits, uuid->least_significant_bits); if (*uuid->base64str) @@ -5067,7 +5067,7 @@ void rd_kafka_Uuid_destroy(rd_kafka_Uuid_t *uuid) { rd_free(uuid); } -char *rd_kafka_Uuid_base64str(rd_kafka_Uuid_t *uuid) { +const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid) { if (*uuid->base64str) return uuid->base64str; @@ -5087,17 +5087,17 @@ char *rd_kafka_Uuid_base64str(rd_kafka_Uuid_t *uuid) { if (!out_base64_str) return NULL; - rd_strlcpy(uuid->base64str, out_base64_str, + rd_strlcpy((char *) uuid->base64str, out_base64_str, 23 /* Removing extra ('=') padding */); rd_free(out_base64_str); return uuid->base64str; } -int64_t rd_kafka_Uuid_least_significant_bits(rd_kafka_Uuid_t *uuid) { +int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid) { return uuid->least_significant_bits; } -int64_t rd_kafka_Uuid_most_significant_bits(rd_kafka_Uuid_t *uuid) { +int64_t rd_kafka_Uuid_most_significant_bits(const rd_kafka_Uuid_t *uuid) { return uuid->most_significant_bits; } \ No newline at end of file diff --git a/src/rdkafka.h b/src/rdkafka.h index f94c220cdb..badb45bb11 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -1645,7 +1645,7 @@ rd_kafka_message_leader_epoch(const rd_kafka_message_t *rkmessage); * @return base64 encoded string for the given UUID or NULL in case of some * issue with the conversion or the conversion is not supported. */ -RD_EXPORT char *rd_kafka_Uuid_base64str(rd_kafka_Uuid_t *uuid); +RD_EXPORT const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid); /** * @brief Gets least significant 64 bits for the given UUID. @@ -1654,7 +1654,7 @@ RD_EXPORT char *rd_kafka_Uuid_base64str(rd_kafka_Uuid_t *uuid); * * @return least significant 64 bits for the given UUID. */ -RD_EXPORT int64_t rd_kafka_Uuid_least_significant_bits(rd_kafka_Uuid_t *uuid); +RD_EXPORT int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid); /** @@ -1664,7 +1664,7 @@ RD_EXPORT int64_t rd_kafka_Uuid_least_significant_bits(rd_kafka_Uuid_t *uuid); * * @return most significant 64 bits for the given UUID. */ -RD_EXPORT int64_t rd_kafka_Uuid_most_significant_bits(rd_kafka_Uuid_t *uuid); +RD_EXPORT int64_t rd_kafka_Uuid_most_significant_bits(const rd_kafka_Uuid_t *uuid); /** @@ -1687,7 +1687,7 @@ RD_EXPORT rd_kafka_Uuid_t *rd_kafka_Uuid_new(int64_t most_significant_bits, * @return A newly allocated copy of the provided UUID. * @remark Must be freed after use using rd_kafka_Uuid_destroy() */ -RD_EXPORT rd_kafka_Uuid_t *rd_kafka_Uuid_copy(rd_kafka_Uuid_t *uuid); +RD_EXPORT rd_kafka_Uuid_t *rd_kafka_Uuid_copy(const rd_kafka_Uuid_t *uuid); /** * @brief Destroy the provided uuid. @@ -8249,7 +8249,7 @@ rd_kafka_TopicDescription_name(const rd_kafka_TopicDescription_t *topicdesc); * @remark The lifetime of the returned memory is the same * as the lifetime of the \p topicdesc object. */ -RD_EXPORT rd_kafka_Uuid_t *rd_kafka_TopicDescription_topic_id( +RD_EXPORT const rd_kafka_Uuid_t *rd_kafka_TopicDescription_topic_id( const rd_kafka_TopicDescription_t *topicdesc); /** diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 4b9a8becc0..93e4e7d6d3 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -8020,7 +8020,7 @@ rd_kafka_TopicPartitionInfo_destroy(rd_kafka_TopicPartitionInfo_t *pinfo) { */ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( const char *topic, - rd_kafka_Uuid_t *topic_id, + rd_kafka_Uuid_t topic_id, const struct rd_kafka_metadata_partition *partitions, int partition_cnt, const struct rd_kafka_metadata_broker *brokers_sorted, @@ -8034,7 +8034,7 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( int i; topicdesc = rd_calloc(1, sizeof(*topicdesc)); topicdesc->topic = rd_strdup(topic); - topicdesc->topic_id = rd_kafka_Uuid_copy(topic_id); + topicdesc->topic_id = topic_id; topicdesc->partition_cnt = partition_cnt; topicdesc->is_internal = is_internal; if (error) @@ -8066,7 +8066,7 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( */ static rd_kafka_TopicDescription_t * rd_kafka_TopicDescription_new_error(const char *topic, - rd_kafka_Uuid_t *topic_id, + rd_kafka_Uuid_t topic_id, rd_kafka_error_t *error) { return rd_kafka_TopicDescription_new(topic, topic_id, NULL, 0, NULL, NULL, 0, NULL, 0, rd_false, error); @@ -8079,7 +8079,6 @@ rd_kafka_TopicDescription_destroy(rd_kafka_TopicDescription_t *topicdesc) { RD_IF_FREE(topicdesc->topic, rd_free); RD_IF_FREE(topicdesc->error, rd_kafka_error_destroy); RD_IF_FREE(topicdesc->authorized_operations, rd_free); - RD_IF_FREE(topicdesc->topic_id, rd_kafka_Uuid_destroy); for (i = 0; i < topicdesc->partition_cnt; i++) rd_kafka_TopicPartitionInfo_destroy(topicdesc->partitions[i]); rd_free(topicdesc->partitions); @@ -8146,9 +8145,9 @@ rd_kafka_TopicDescription_error(const rd_kafka_TopicDescription_t *topicdesc) { return topicdesc->error; } -rd_kafka_Uuid_t *rd_kafka_TopicDescription_topic_id( +const rd_kafka_Uuid_t *rd_kafka_TopicDescription_topic_id( const rd_kafka_TopicDescription_t *topicdesc) { - return topicdesc->topic_id; + return &topicdesc->topic_id; } const rd_kafka_TopicDescription_t **rd_kafka_DescribeTopics_result_topics( @@ -8249,7 +8248,7 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req, mdi->topics[i].topic_authorized_operations, &authorized_operation_cnt); topicdesc = rd_kafka_TopicDescription_new( - md->topics[i].topic, &mdi->topics[i].topic_id, + md->topics[i].topic, mdi->topics[i].topic_id, md->topics[i].partitions, md->topics[i].partition_cnt, mdi->brokers_sorted, mdi->brokers, md->broker_cnt, authorized_operations, @@ -8261,7 +8260,7 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req, md->topics[i].err, "%s", rd_kafka_err2str(md->topics[i].err)); topicdesc = rd_kafka_TopicDescription_new_error( - md->topics[i].topic, &mdi->topics[i].topic_id, + md->topics[i].topic, mdi->topics[i].topic_id, error); rd_kafka_error_destroy(error); } diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index d634b9bb74..235727ed0f 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -526,7 +526,7 @@ struct rd_kafka_TopicPartitionInfo_s { */ struct rd_kafka_TopicDescription_s { char *topic; /**< Topic name */ - rd_kafka_Uuid_t *topic_id; /**< Topic Id */ + rd_kafka_Uuid_t topic_id; /**< Topic Id */ int partition_cnt; /**< Number of partitions in \p partitions*/ rd_bool_t is_internal; /**< Is the topic is internal to Kafka? */ rd_kafka_TopicPartitionInfo_t **partitions; /**< Partitions */ diff --git a/tests/0081-admin.c b/tests/0081-admin.c index 32d89cd14c..c8c6fcc7ab 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -3169,7 +3169,7 @@ static void do_test_DescribeTopics(const char *what, const rd_kafka_DescribeTopics_result_t *res; const rd_kafka_TopicDescription_t **result_topics; const rd_kafka_TopicPartitionInfo_t **partitions; - rd_kafka_Uuid_t *topic_id; + const rd_kafka_Uuid_t *topic_id; size_t partitions_cnt; size_t result_topics_cnt; char errstr[128]; From 762b2c53a407c5f352436507a8dbf9cacd3e8482 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Wed, 11 Oct 2023 11:40:18 +0530 Subject: [PATCH 09/10] Style fixes --- examples/describe_topics.c | 5 +++-- src/rdbase64.c | 13 +++++++------ src/rdkafka.c | 2 +- src/rdkafka.h | 6 ++++-- src/rdkafka_admin.h | 6 +++--- 5 files changed, 18 insertions(+), 14 deletions(-) diff --git a/examples/describe_topics.c b/examples/describe_topics.c index 83ad38a8cd..cf38a70e21 100644 --- a/examples/describe_topics.c +++ b/examples/describe_topics.c @@ -204,8 +204,9 @@ static void print_topic_info(const rd_kafka_TopicDescription_t *topic) { size_t authorized_operations_cnt; const rd_kafka_TopicPartitionInfo_t **partitions; size_t partition_cnt; - const rd_kafka_Uuid_t *topic_id = rd_kafka_TopicDescription_topic_id(topic); - const char *topic_id_str = rd_kafka_Uuid_base64str(topic_id); + const rd_kafka_Uuid_t *topic_id = + rd_kafka_TopicDescription_topic_id(topic); + const char *topic_id_str = rd_kafka_Uuid_base64str(topic_id); if (rd_kafka_error_code(error)) { printf("Topic: %s (Topic Id: %s) has error[%" PRId32 "]: %s\n", diff --git a/src/rdbase64.c b/src/rdbase64.c index 4a342ce581..aaf2fb138e 100644 --- a/src/rdbase64.c +++ b/src/rdbase64.c @@ -32,15 +32,14 @@ #include #else -# define conv_bin2ascii(a, table) ((table)[(a)&0x3f]) +#define conv_bin2ascii(a, table) ((table)[(a)&0x3f]) static const unsigned char data_bin2ascii[65] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; static int base64_encoding_conversion(unsigned char *out, const unsigned char *in, - int dlen) -{ + int dlen) { int i, ret = 0; unsigned long l; @@ -59,7 +58,9 @@ static int base64_encoding_conversion(unsigned char *out, *(out++) = conv_bin2ascii(l >> 18L, data_bin2ascii); *(out++) = conv_bin2ascii(l >> 12L, data_bin2ascii); - *(out++) = (i == 1) ? '=' : conv_bin2ascii(l >> 6L, data_bin2ascii); + *(out++) = + (i == 1) ? '=' + : conv_bin2ascii(l >> 6L, data_bin2ascii); *(out++) = '='; } ret += 4; @@ -96,8 +97,8 @@ void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) { out->size = EVP_EncodeBlock((unsigned char *)out->ptr, (unsigned char *)in->ptr, (int)in->size); #else - out->size = base64_encoding_conversion((unsigned char *)out->ptr, - (unsigned char *)in->ptr, (int)in->size); + out->size = base64_encoding_conversion( + (unsigned char *)out->ptr, (unsigned char *)in->ptr, (int)in->size); #endif rd_assert(out->size < max_len); diff --git a/src/rdkafka.c b/src/rdkafka.c index fd5ea3a61a..1098e17b17 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -5087,7 +5087,7 @@ const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid) { if (!out_base64_str) return NULL; - rd_strlcpy((char *) uuid->base64str, out_base64_str, + rd_strlcpy((char *)uuid->base64str, out_base64_str, 23 /* Removing extra ('=') padding */); rd_free(out_base64_str); return uuid->base64str; diff --git a/src/rdkafka.h b/src/rdkafka.h index badb45bb11..0802d6507d 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -1654,7 +1654,8 @@ RD_EXPORT const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid); * * @return least significant 64 bits for the given UUID. */ -RD_EXPORT int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid); +RD_EXPORT int64_t +rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid); /** @@ -1664,7 +1665,8 @@ RD_EXPORT int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uu * * @return most significant 64 bits for the given UUID. */ -RD_EXPORT int64_t rd_kafka_Uuid_most_significant_bits(const rd_kafka_Uuid_t *uuid); +RD_EXPORT int64_t +rd_kafka_Uuid_most_significant_bits(const rd_kafka_Uuid_t *uuid); /** diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index 235727ed0f..3e7378af56 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -525,10 +525,10 @@ struct rd_kafka_TopicPartitionInfo_s { * @struct DescribeTopics result */ struct rd_kafka_TopicDescription_s { - char *topic; /**< Topic name */ + char *topic; /**< Topic name */ rd_kafka_Uuid_t topic_id; /**< Topic Id */ - int partition_cnt; /**< Number of partitions in \p partitions*/ - rd_bool_t is_internal; /**< Is the topic is internal to Kafka? */ + int partition_cnt; /**< Number of partitions in \p partitions*/ + rd_bool_t is_internal; /**< Is the topic is internal to Kafka? */ rd_kafka_TopicPartitionInfo_t **partitions; /**< Partitions */ rd_kafka_error_t *error; /**< Topic error reported by broker */ int authorized_operations_cnt; /**< Count of operations allowed for From fe8e720f0be9f5b2329a99f90d8d42a96163b2c4 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 16 Oct 2023 17:50:39 +0200 Subject: [PATCH 10/10] Changelog cleanup --- CHANGELOG.md | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25808865fd..9c6620c57e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,20 +2,17 @@ librdkafka v2.3.0 is a feature release: - * Added Topic id to the metadata response which is part of the [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers) - * Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` - (#4240, @jainruchir). - * [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses): - Return authorized operations in Describe Responses. - (#4240, @jainruchir). - * Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` + * [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers) + Partial support of topic identifiers. Topic identifiers in metadata response + available through the new `rd_kafka_DescribeTopics` function (#4300, #4451). + * [KIP-117](https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations) Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` (#4240, @jainruchir). * [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses): Return authorized operations in Describe Responses. (#4240, @jainruchir). * [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients): Added Exponential Backoff mechanism for retriable requests with `retry.backoff.ms` as minimum backoff and `retry.backoff.max.ms` as the - maximum backoff, with 20% jitter(#4422). + maximum backoff, with 20% jitter (#4422). * Fixed ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0. * Add missing destroy that leads to leaking partition structure memory when there are partition leader changes and a stale leader epoch is received (#4429).