Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added topic id to describe_topic response. #4451

Merged
merged 11 commits into from
Oct 16, 2023
16 changes: 9 additions & 7 deletions examples/describe_topics.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,17 +198,19 @@ print_partition_info(const rd_kafka_TopicPartitionInfo_t *partition) {
*/
static void print_topic_info(const rd_kafka_TopicDescription_t *topic) {
size_t j;
const rd_kafka_error_t *error;
const char *topic_name = rd_kafka_TopicDescription_name(topic);
error = rd_kafka_TopicDescription_error(topic);
const char *topic_name = rd_kafka_TopicDescription_name(topic);
const rd_kafka_error_t *error = rd_kafka_TopicDescription_error(topic);
const rd_kafka_AclOperation_t *authorized_operations;
size_t authorized_operations_cnt;
const rd_kafka_TopicPartitionInfo_t **partitions;
size_t partition_cnt;
const rd_kafka_Uuid_t *topic_id =
rd_kafka_TopicDescription_topic_id(topic);
const char *topic_id_str = rd_kafka_Uuid_base64str(topic_id);

if (rd_kafka_error_code(error)) {
printf("Topic: %s has error[%" PRId32 "]: %s\n", topic_name,
rd_kafka_error_code(error),
printf("Topic: %s (Topic Id: %s) has error[%" PRId32 "]: %s\n",
topic_name, topic_id_str, rd_kafka_error_code(error),
rd_kafka_error_string(error));
return;
}
Expand All @@ -217,9 +219,9 @@ static void print_topic_info(const rd_kafka_TopicDescription_t *topic) {
topic, &authorized_operations_cnt);

printf(
"Topic: %s succeeded, has %d topic authorized operations "
"Topic: %s (Topic Id: %s) succeeded, has %ld authorized operations "
"allowed, they are:\n",
topic_name, (int)authorized_operations_cnt);
topic_name, topic_id_str, authorized_operations_cnt);

for (j = 0; j < authorized_operations_cnt; j++)
printf("\t%s operation is allowed\n",
Expand Down
50 changes: 46 additions & 4 deletions src/rdbase64.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,47 @@

#if WITH_SSL
#include <openssl/ssl.h>
#else

#define conv_bin2ascii(a, table) ((table)[(a)&0x3f])

static const unsigned char data_bin2ascii[65] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";

static int base64_encoding_conversion(unsigned char *out,
emasab marked this conversation as resolved.
Show resolved Hide resolved
const unsigned char *in,
int dlen) {
int i, ret = 0;
unsigned long l;

for (i = dlen; i > 0; i -= 3) {
if (i >= 3) {
l = (((unsigned long)in[0]) << 16L) |
(((unsigned long)in[1]) << 8L) | in[2];
*(out++) = conv_bin2ascii(l >> 18L, data_bin2ascii);
*(out++) = conv_bin2ascii(l >> 12L, data_bin2ascii);
*(out++) = conv_bin2ascii(l >> 6L, data_bin2ascii);
*(out++) = conv_bin2ascii(l, data_bin2ascii);
} else {
l = ((unsigned long)in[0]) << 16L;
if (i == 2)
l |= ((unsigned long)in[1] << 8L);

*(out++) = conv_bin2ascii(l >> 18L, data_bin2ascii);
*(out++) = conv_bin2ascii(l >> 12L, data_bin2ascii);
*(out++) =
(i == 1) ? '='
: conv_bin2ascii(l >> 6L, data_bin2ascii);
*(out++) = '=';
}
ret += 4;
in += 3;
}

*out = '\0';
return ret;
}

#endif

/**
Expand All @@ -41,7 +82,6 @@
*/
void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) {

#if WITH_SSL
size_t max_len;

/* OpenSSL takes an |int| argument so the input cannot exceed that. */
Expand All @@ -53,14 +93,16 @@ void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) {
max_len = (((in->size + 2) / 3) * 4) + 1;
out->ptr = rd_malloc(max_len);

#if WITH_SSL
out->size = EVP_EncodeBlock((unsigned char *)out->ptr,
(unsigned char *)in->ptr, (int)in->size);
#else
out->size = base64_encoding_conversion(
(unsigned char *)out->ptr, (unsigned char *)in->ptr, (int)in->size);
#endif

rd_assert(out->size < max_len);
out->ptr[out->size] = 0;
#else
out->ptr = NULL;
#endif
}


Expand Down
74 changes: 74 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -5032,3 +5032,77 @@ int rd_kafka_errno(void) {
int rd_kafka_unittest(void) {
return rd_unittest();
}


/**
* Creates a new UUID.
*
* @return A newly allocated UUID.
*/
rd_kafka_Uuid_t *rd_kafka_Uuid_new(int64_t most_significant_bits,
int64_t least_significant_bits) {
rd_kafka_Uuid_t *uuid = rd_calloc(1, sizeof(rd_kafka_Uuid_t));
uuid->most_significant_bits = most_significant_bits;
uuid->least_significant_bits = least_significant_bits;
return uuid;
}

/**
* Returns a newly allocated copy of the given UUID.
*
* @param uuid UUID to copy.
* @return Copy of the provided UUID.
*
* @remark Dynamically allocated. Deallocate (free) after use.
*/
rd_kafka_Uuid_t *rd_kafka_Uuid_copy(const rd_kafka_Uuid_t *uuid) {
rd_kafka_Uuid_t *copy_uuid = rd_kafka_Uuid_new(
uuid->most_significant_bits, uuid->least_significant_bits);
if (*uuid->base64str)
memcpy(copy_uuid->base64str, uuid->base64str, 23);
return copy_uuid;
}

/**
* @brief Destroy the provided uuid.
*
* @param uuid UUID
*/
void rd_kafka_Uuid_destroy(rd_kafka_Uuid_t *uuid) {
rd_free(uuid);
}

const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid) {
if (*uuid->base64str)
return uuid->base64str;

rd_chariov_t in_base64;
char *out_base64_str;
char *uuid_bytes;
uint64_t input_uuid[2];

input_uuid[0] = htobe64(uuid->most_significant_bits);
input_uuid[1] = htobe64(uuid->least_significant_bits);
uuid_bytes = (char *)input_uuid;
in_base64.ptr = uuid_bytes;
in_base64.size = sizeof(uuid->most_significant_bits) +
sizeof(uuid->least_significant_bits);

out_base64_str = rd_base64_encode_str(&in_base64);
if (!out_base64_str)
return NULL;

rd_strlcpy((char *)uuid->base64str, out_base64_str,
23 /* Removing extra ('=') padding */);
rd_free(out_base64_str);
return uuid->base64str;
}

int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid) {
return uuid->least_significant_bits;
}


int64_t rd_kafka_Uuid_most_significant_bits(const rd_kafka_Uuid_t *uuid) {
return uuid->most_significant_bits;
}
82 changes: 82 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ typedef struct rd_kafka_error_s rd_kafka_error_t;
typedef struct rd_kafka_headers_s rd_kafka_headers_t;
typedef struct rd_kafka_group_result_s rd_kafka_group_result_t;
typedef struct rd_kafka_acl_result_s rd_kafka_acl_result_t;
typedef struct rd_kafka_Uuid_s rd_kafka_Uuid_t;
/* @endcond */


Expand Down Expand Up @@ -1631,6 +1632,75 @@ rd_kafka_message_leader_epoch(const rd_kafka_message_t *rkmessage);
/**@}*/


/**
* @name UUID
* @{
*
*/

/**
* @brief Computes base64 encoding for the given uuid string.
* @param uuid UUID for which base64 encoding is required.
*
* @return base64 encoded string for the given UUID or NULL in case of some
* issue with the conversion or the conversion is not supported.
*/
RD_EXPORT const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid);

/**
* @brief Gets least significant 64 bits for the given UUID.
*
* @param uuid UUID
*
* @return least significant 64 bits for the given UUID.
*/
RD_EXPORT int64_t
rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid);


/**
* @brief Gets most significant 64 bits for the given UUID.
*
* @param uuid UUID
*
* @return most significant 64 bits for the given UUID.
*/
RD_EXPORT int64_t
rd_kafka_Uuid_most_significant_bits(const rd_kafka_Uuid_t *uuid);


/**
* @brief Creates a new UUID.
*
* @param most_significant_bits most significant 64 bits of the 128 bits UUID.
* @param least_significant_bits least significant 64 bits of the 128 bits UUID.
*
* @return A newly allocated UUID.
* @remark Must be freed after use using rd_kafka_Uuid_destroy()
*/
RD_EXPORT rd_kafka_Uuid_t *rd_kafka_Uuid_new(int64_t most_significant_bits,
int64_t least_significant_bits);

/**
* @brief Copies the given UUID.
*
* @param uuid UUID to be copied.
*
* @return A newly allocated copy of the provided UUID.
* @remark Must be freed after use using rd_kafka_Uuid_destroy()
*/
RD_EXPORT rd_kafka_Uuid_t *rd_kafka_Uuid_copy(const rd_kafka_Uuid_t *uuid);

/**
* @brief Destroy the provided uuid.
*
* @param uuid UUID
*/
RD_EXPORT void rd_kafka_Uuid_destroy(rd_kafka_Uuid_t *uuid);

/**@}*/


/**
* @name Configuration interface
* @{
Expand Down Expand Up @@ -8172,6 +8242,18 @@ RD_EXPORT
const char *
rd_kafka_TopicDescription_name(const rd_kafka_TopicDescription_t *topicdesc);

/**
* @brief Gets the topic id for the \p topicdesc topic.
*
* @param topicdesc The topic description.
* @return The topic id
*
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p topicdesc object.
*/
RD_EXPORT const rd_kafka_Uuid_t *rd_kafka_TopicDescription_topic_id(
const rd_kafka_TopicDescription_t *topicdesc);

/**
* @brief Gets if the \p topicdesc topic is internal.
*
Expand Down
20 changes: 15 additions & 5 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -8010,6 +8010,7 @@ rd_kafka_TopicPartitionInfo_destroy(rd_kafka_TopicPartitionInfo_t *pinfo) {
* @brief Create a new TopicDescription object.
*
* @param topic topic name
* @param topic_id topic id
* @param partitions Array of partition metadata (rd_kafka_metadata_partition).
* @param partition_cnt Number of partitions in partition metadata.
* @param authorized_operations acl operations allowed for topic.
Expand All @@ -8019,6 +8020,7 @@ rd_kafka_TopicPartitionInfo_destroy(rd_kafka_TopicPartitionInfo_t *pinfo) {
*/
static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new(
const char *topic,
rd_kafka_Uuid_t topic_id,
const struct rd_kafka_metadata_partition *partitions,
int partition_cnt,
const struct rd_kafka_metadata_broker *brokers_sorted,
Expand All @@ -8032,6 +8034,7 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new(
int i;
topicdesc = rd_calloc(1, sizeof(*topicdesc));
topicdesc->topic = rd_strdup(topic);
topicdesc->topic_id = topic_id;
topicdesc->partition_cnt = partition_cnt;
topicdesc->is_internal = is_internal;
if (error)
Expand Down Expand Up @@ -8063,9 +8066,10 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new(
*/
static rd_kafka_TopicDescription_t *
rd_kafka_TopicDescription_new_error(const char *topic,
rd_kafka_Uuid_t topic_id,
rd_kafka_error_t *error) {
return rd_kafka_TopicDescription_new(topic, NULL, 0, NULL, NULL, 0,
NULL, 0, rd_false, error);
return rd_kafka_TopicDescription_new(topic, topic_id, NULL, 0, NULL,
NULL, 0, NULL, 0, rd_false, error);
}

static void
Expand All @@ -8075,7 +8079,6 @@ rd_kafka_TopicDescription_destroy(rd_kafka_TopicDescription_t *topicdesc) {
RD_IF_FREE(topicdesc->topic, rd_free);
RD_IF_FREE(topicdesc->error, rd_kafka_error_destroy);
RD_IF_FREE(topicdesc->authorized_operations, rd_free);

for (i = 0; i < topicdesc->partition_cnt; i++)
rd_kafka_TopicPartitionInfo_destroy(topicdesc->partitions[i]);
rd_free(topicdesc->partitions);
Expand Down Expand Up @@ -8142,6 +8145,11 @@ rd_kafka_TopicDescription_error(const rd_kafka_TopicDescription_t *topicdesc) {
return topicdesc->error;
}

const rd_kafka_Uuid_t *rd_kafka_TopicDescription_topic_id(
const rd_kafka_TopicDescription_t *topicdesc) {
return &topicdesc->topic_id;
}

const rd_kafka_TopicDescription_t **rd_kafka_DescribeTopics_result_topics(
const rd_kafka_DescribeTopics_result_t *result,
size_t *cntp) {
Expand Down Expand Up @@ -8240,7 +8248,8 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req,
mdi->topics[i].topic_authorized_operations,
&authorized_operation_cnt);
topicdesc = rd_kafka_TopicDescription_new(
md->topics[i].topic, md->topics[i].partitions,
md->topics[i].topic, mdi->topics[i].topic_id,
md->topics[i].partitions,
md->topics[i].partition_cnt, mdi->brokers_sorted,
mdi->brokers, md->broker_cnt, authorized_operations,
authorized_operation_cnt,
Expand All @@ -8251,7 +8260,8 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req,
md->topics[i].err, "%s",
rd_kafka_err2str(md->topics[i].err));
topicdesc = rd_kafka_TopicDescription_new_error(
md->topics[i].topic, error);
md->topics[i].topic, mdi->topics[i].topic_id,
error);
rd_kafka_error_destroy(error);
}
orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
Expand Down
7 changes: 4 additions & 3 deletions src/rdkafka_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -525,9 +525,10 @@ struct rd_kafka_TopicPartitionInfo_s {
* @struct DescribeTopics result
*/
struct rd_kafka_TopicDescription_s {
char *topic; /**< Topic name */
int partition_cnt; /**< Number of partitions in \p partitions*/
rd_bool_t is_internal; /**< Is the topic is internal to Kafka? */
char *topic; /**< Topic name */
rd_kafka_Uuid_t topic_id; /**< Topic Id */
int partition_cnt; /**< Number of partitions in \p partitions*/
rd_bool_t is_internal; /**< Is the topic is internal to Kafka? */
rd_kafka_TopicPartitionInfo_t **partitions; /**< Partitions */
rd_kafka_error_t *error; /**< Topic error reported by broker */
int authorized_operations_cnt; /**< Count of operations allowed for
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -1454,7 +1454,7 @@ void rd_kafka_buf_set_maker(rd_kafka_buf_t *rkbuf,
} while (0)

static RD_UNUSED void rd_kafka_buf_write_uuid(rd_kafka_buf_t *rkbuf,
rd_kafka_uuid_t *uuid) {
rd_kafka_Uuid_t *uuid) {
rd_kafka_buf_write_i64(rkbuf, uuid->most_significant_bits);
rd_kafka_buf_write_i64(rkbuf, uuid->least_significant_bits);
}
Expand Down
Loading