Skip to content

Commit 248ca27

Browse files
committed
Allow message payload to be NULL.
NULL message payloads are used with compaction topics introduced in 0.8.1 which use the NULL payload to indicate the corresponding key should be deleted. The Kafka protocol guide also notes that the Value (payload) may be NULL. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets This commit makes these changes: - prevents NULL payloads from being copied when RD_KAFKA_MSG_F_COPY is set. - prevents payload bytes being copied into rkbuf when NULL. - sets rkm_len to -1 when payload is NULL (just like key). - Allows NULL payloads to be received in a fetch.
1 parent ff3fb1e commit 248ca27

File tree

2 files changed

+26
-8
lines changed

2 files changed

+26
-8
lines changed

Diff for: src/rdkafka_broker.c

+19-8
Original file line numberDiff line numberDiff line change
@@ -2127,6 +2127,10 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
21272127

21282128

21292129
/* Value(payload) length */
2130+
if (rkm->rkm_payload == NULL) {
2131+
/* Set length to -1 to indicate a NULL value. */
2132+
rkm->rkm_len = -1;
2133+
}
21302134
msghdr->part4.Value_len = htonl(rkm->rkm_len);
21312135
msghdr->part3.Crc =
21322136
crc32(msghdr->part3.Crc,
@@ -2138,12 +2142,15 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
21382142
rd_kafka_buf_push(rkbuf, &msghdr->part4, sizeof(msghdr->part4));
21392143

21402144

2141-
/* Payload */
2142-
msghdr->part3.Crc =
2143-
crc32(msghdr->part3.Crc,
2144-
rkm->rkm_payload,
2145-
rkm->rkm_len);
2146-
rd_kafka_buf_push(rkbuf, rkm->rkm_payload, rkm->rkm_len);
2145+
2146+
/* Add Payload if not NULL */
2147+
if (rkm->rkm_payload != NULL) {
2148+
msghdr->part3.Crc =
2149+
crc32(msghdr->part3.Crc,
2150+
rkm->rkm_payload,
2151+
rkm->rkm_len);
2152+
rd_kafka_buf_push(rkbuf, rkm->rkm_payload, rkm->rkm_len);
2153+
}
21472154

21482155

21492156
/* Finalize Crc */
@@ -2801,14 +2808,18 @@ static rd_kafka_resp_err_t rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
28012808
/* Create op and push on temporary queue. */
28022809
rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH);
28032810

2811+
rko->rko_rkmessage.key = NULL;
28042812
if (!RD_KAFKAP_BYTES_IS_NULL(Key)) {
28052813
rko->rko_rkmessage.key = Key->data;
28062814
rko->rko_rkmessage.key_len =
28072815
RD_KAFKAP_BYTES_LEN(Key);
28082816
}
28092817

2810-
rko->rko_rkmessage.payload = Value->data;
2811-
rko->rko_rkmessage.len = Value_len;
2818+
rko->rko_rkmessage.payload = NULL;
2819+
if (!RD_KAFKAP_BYTES_IS_NULL(Value)) {
2820+
rko->rko_rkmessage.payload = Value->data;
2821+
rko->rko_rkmessage.len = RD_KAFKAP_BYTES_LEN(Value);
2822+
}
28122823

28132824
rko->rko_rkmessage.offset = hdr->Offset;
28142825
rko->rko_rkmessage.rkt = rktp->rktp_rkt;

Diff for: src/rdkafka_msg.c

+7
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,13 @@ static rd_kafka_msg_t *rd_kafka_msg_new0 (rd_kafka_topic_t *rkt,
9393
rkt->rkt_conf.message_timeout_ms * 1000;
9494
}
9595

96+
/* Check for a NULL payload */
97+
if (payload == NULL) {
98+
rkm->rkm_payload = NULL;
99+
rkm->rkm_len = 0;
100+
return rkm;
101+
}
102+
96103
if (msgflags & RD_KAFKA_MSG_F_COPY) {
97104
/* Copy payload to space following the ..msg_t */
98105
rkm->rkm_payload = (void *)(rkm+1);

0 commit comments

Comments
 (0)