Skip to content

Commit

Permalink
Added new data type UUID for topic id. Added topic id support in Meta…
Browse files Browse the repository at this point in the history
…data Response.
  • Loading branch information
pranavrth committed May 29, 2023
1 parent 4b63c6c commit 1c9e5c2
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 28 deletions.
2 changes: 1 addition & 1 deletion INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1973,7 +1973,7 @@ release of librdkafka.
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 13 | 11 |
| 2 | ListOffsets | 7 | 2 |
| 3 | Metadata | 12 | 9 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(
rdbuf.c
rdcrc32.c
rdfnv1a.c
rdbase64.c
rdkafka.c
rdkafka_assignor.c
rdkafka_broker.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_msgset_writer.c rdkafka_msgset_reader.c \
rdkafka_header.c rdkafka_admin.c rdkafka_aux.c \
rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \
rdkafka_txnmgr.c rdkafka_coord.c \
rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c\
rdvarint.c rdbuf.c rdmap.c rdunittest.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
rdkafka_error.c rdkafka_fetcher.c \
Expand Down
63 changes: 63 additions & 0 deletions src/rdbase64.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2023 Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "rdkafka_int.h"
#include "rdbase64.h"

/**
* @brief Base64 encode binary input \p in, and write base64-encoded string
* and it's size to \p out
*
* out->ptr must be freed after use.
*/
void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) {
size_t max_len;

max_len = (((in->size + 2) / 3) * 4) + 1;
out->ptr = rd_malloc(max_len);
rd_assert(out->ptr);

out->size = EVP_EncodeBlock((uint8_t *)out->ptr, (uint8_t *)in->ptr,
(int)in->size);

rd_assert(out->size <= max_len);
out->ptr[out->size] = 0;
}


/**
* @brief Base64 encode binary input \p in
* @returns a newly allocated, base64-encoded string
*
* Returned string must be freed after use
*/
char * rd_base64_encode_str(const rd_chariov_t *in) {
rd_chariov_t out;
rd_base64_encode(in, &out);
return out.ptr;
}
33 changes: 33 additions & 0 deletions src/rdbase64.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2023 Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "rdkafka_int.h"

void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out);

char * rd_base64_encode_str(const rd_chariov_t *in);
42 changes: 42 additions & 0 deletions src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "rdlist.h"
#include "rdbuf.h"
#include "rdkafka_msgbatch.h"
#include "rdbase64.h"

typedef struct rd_kafka_broker_s rd_kafka_broker_t;

Expand Down Expand Up @@ -1195,6 +1196,17 @@ rd_kafka_buf_update_i64(rd_kafka_buf_t *rkbuf, size_t of, int64_t v) {
}


/**
* Write uint64_t to buffer.
* The value will be endian-swapped before write.
*/
static RD_INLINE size_t rd_kafka_buf_write_u64(rd_kafka_buf_t *rkbuf,
uint64_t v) {
v = htobe64(v);
return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
}


/**
* @brief Write standard (2-byte header) or KIP-482 COMPACT_STRING to buffer.
*
Expand Down Expand Up @@ -1407,4 +1419,34 @@ void rd_kafka_buf_set_maker(rd_kafka_buf_t *rkbuf,
void *make_opaque,
void (*free_make_opaque_cb)(void *make_opaque));


#define rd_kafka_buf_read_uuid(rkbuf, uuid) \
do { \
uint64_t _msb; \
uint64_t _lsb; \
rd_chariov_t in_base64; \
char *out_base64_str; \
char *uuid_bytes; \
uint64_t input_uuid[2]; \
rd_kafka_buf_read(rkbuf, &_msb, sizeof(_msb)); \
rd_kafka_buf_read(rkbuf, &_lsb, sizeof(_lsb)); \
input_uuid[0] = _msb; \
input_uuid[1] = _lsb; \
uuid_bytes = (char *)input_uuid; \
in_base64.ptr = uuid_bytes; \
in_base64.size = sizeof(_msb) + sizeof(_lsb); \
out_base64_str = rd_base64_encode_str(&in_base64); \
(uuid)->most_significant_bits = be64toh(_msb); \
(uuid)->least_significant_bits = be64toh(_lsb); \
rd_strlcpy((uuid)->base64str, out_base64_str, 23); \
free(out_base64_str); \
} while (0)


static RD_UNUSED void rd_kafka_buf_write_uuid(rd_kafka_buf_t *rkbuf,
rd_kafka_uuid_t *uuid) {
rd_kafka_buf_write_u64(rkbuf, uuid->most_significant_bits);
rd_kafka_buf_write_u64(rkbuf, uuid->least_significant_bits);
}

#endif /* _RDKAFKA_BUF_H_ */
5 changes: 5 additions & 0 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,11 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_read_i16a(rkbuf, md->topics[i].err);
rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf,
md->topics[i].topic);

if (ApiVersion >= 10) {
rd_kafka_buf_read_uuid(rkbuf, &mdi->topics[i].topic_id);
}

if (ApiVersion >= 1) {
int8_t is_internal;
rd_kafka_buf_read_i8(rkbuf, &is_internal);
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ typedef struct rd_kafka_metadata_topic_internal_s {
* same count as metadata.topics[i].partition_cnt.
* Sorted by Partition Id. */
rd_kafka_metadata_partition_internal_t *partitions;
rd_kafka_uuid_t topic_id;
} rd_kafka_metadata_topic_internal_t;


Expand Down
20 changes: 20 additions & 0 deletions src/rdkafka_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,26 @@ typedef struct rd_kafka_buf_s rd_kafka_buf_t;
(8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2 + 4)


/**
* @brief UUID
*
*/
typedef struct rd_kafka_uuid_s {
uint64_t most_significant_bits;
uint64_t least_significant_bits;
char base64str[23];
} rd_kafka_uuid_t;

#define RD_KAFKA_ZERO_UUID \
{ 0, 0 }

#define RD_KADKA_METADATA_TOPIC_ID \
{ 0, 1 }

static RD_UNUSED void rd_kafka_uuid_destroy(rd_kafka_uuid_t *uuid) {
free(uuid);
}


/**
* @name Producer ID and Epoch for the Idempotent Producer
Expand Down
15 changes: 11 additions & 4 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2207,11 +2207,12 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
int16_t ApiVersion = 0;
size_t of_TopicArrayCnt;
int features;
int topic_cnt = topics ? rd_list_cnt(topics) : 0;
int *full_incr = NULL;
int topic_cnt = topics ? rd_list_cnt(topics) : 0;
int *full_incr = NULL;
rd_kafka_uuid_t zero_uuid = RD_KAFKA_ZERO_UUID;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_Metadata, 0, 9, &features);
rkb, RD_KAFKAP_Metadata, 0, 12, &features);

rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_Metadata, 1,
4 + (50 * topic_cnt) + 1,
Expand Down Expand Up @@ -2306,6 +2307,12 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
rd_list_copy(topics, rd_list_string_copy, NULL);

RD_LIST_FOREACH(topic, topics, i) {
if (ApiVersion >= 10) {
/* FIXME: Not supporting topic id in the request
* right now. Update this to correct topic
* id once KIP-516 is fully implemented. */
rd_kafka_buf_write_uuid(rkbuf, &zero_uuid);
}
rd_kafka_buf_write_str(rkbuf, topic, -1);
/* Tags for previous topic */
rd_kafka_buf_write_tags(rkbuf);
Expand All @@ -2331,7 +2338,7 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
"on broker auto.create.topics.enable configuration");
}

if (ApiVersion >= 8 && ApiVersion < 10) {
if (ApiVersion >= 8 && ApiVersion <= 10) {
/* TODO: implement KIP-430 */
/* IncludeClusterAuthorizedOperations */
rd_kafka_buf_write_bool(rkbuf, rd_false);
Expand Down
20 changes: 1 addition & 19 deletions src/rdkafka_sasl_oauthbearer_oidc.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,7 @@
#include <curl/curl.h>
#include "rdhttp.h"
#include "rdkafka_sasl_oauthbearer_oidc.h"


/**
* @brief Base64 encode binary input \p in, and write base64-encoded string
* and it's size to \p out
*/
static void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) {
size_t max_len;

max_len = (((in->size + 2) / 3) * 4) + 1;
out->ptr = rd_malloc(max_len);
rd_assert(out->ptr);

out->size = EVP_EncodeBlock((uint8_t *)out->ptr, (uint8_t *)in->ptr,
(int)in->size);

rd_assert(out->size <= max_len);
out->ptr[out->size] = 0;
}
#include "rdbase64.h"


/**
Expand Down
6 changes: 3 additions & 3 deletions src/rdkafka_sasl_scram.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ static char *rd_kafka_sasl_scram_get_attr(const rd_chariov_t *inbuf,
* @brief Base64 encode binary input \p in
* @returns a newly allocated, base64-encoded string or NULL on error.
*/
static char *rd_base64_encode(const rd_chariov_t *in) {
static char *rd_base64_encode_scram(const rd_chariov_t *in) {
char *ret;
size_t ret_len, max_len;

Expand Down Expand Up @@ -486,7 +486,7 @@ static int rd_kafka_sasl_scram_build_client_final_message(
}

/* Store the Base64 encoded ServerSignature for quick comparison */
state->ServerSignatureB64 = rd_base64_encode(&ServerSignature);
state->ServerSignatureB64 = rd_base64_encode_scram(&ServerSignature);
if (state->ServerSignatureB64 == NULL) {
rd_free(client_final_msg_wo_proof.ptr);
return -1;
Expand All @@ -511,7 +511,7 @@ static int rd_kafka_sasl_scram_build_client_final_message(


/* Base64 encoded ClientProof */
ClientProofB64 = rd_base64_encode(&ClientProof);
ClientProofB64 = rd_base64_encode_scram(&ClientProof);
if (ClientProofB64 == NULL) {
rd_free(client_final_msg_wo_proof.ptr);
return -1;
Expand Down
2 changes: 2 additions & 0 deletions win32/librdkafka.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
<ClInclude Include="..\src\rdatomic.h" />
<ClInclude Include="..\src\rdavg.h" />
<ClInclude Include="..\src\rdbuf.h" />
<ClInclude Include="..\src\rdbase64.h" />
<ClInclude Include="..\src\rdendian.h" />
<ClInclude Include="..\src\rdfloat.h" />
<ClInclude Include="..\src\rdgz.h" />
Expand Down Expand Up @@ -174,6 +175,7 @@
<ClCompile Include="..\src\crc32c.c" />
<ClCompile Include="..\src\rdaddr.c" />
<ClCompile Include="..\src\rdbuf.c" />
<ClInclude Include="..\src\rdbase64.h" />
<ClCompile Include="..\src\rdcrc32.c" />
<ClCompile Include="..\src\rdfnv1a.c" />
<ClCompile Include="..\src\rdgz.c" />
Expand Down

0 comments on commit 1c9e5c2

Please sign in to comment.