Skip to content

Commit 7305785

Browse files
committed
Add support for sending NULL messages (thanks to @secretmike, PR #200)
1 parent 1da5fb7 commit 7305785

File tree

2 files changed

+17
-10
lines changed

2 files changed

+17
-10
lines changed

Diff for: src/rdkafka_broker.c

+15-8
Original file line numberDiff line numberDiff line change
@@ -2127,7 +2127,9 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
21272127

21282128

21292129
/* Value(payload) length */
2130-
msghdr->part4.Value_len = htonl(rkm->rkm_len);
2130+
msghdr->part4.Value_len = htonl(rkm->rkm_payload ?
2131+
rkm->rkm_len :
2132+
RD_KAFKAP_BYTES_LEN_NULL);
21312133
msghdr->part3.Crc =
21322134
crc32(msghdr->part3.Crc,
21332135
(void *)
@@ -2136,14 +2138,16 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
21362138
Value_len));
21372139

21382140
rd_kafka_buf_push(rkbuf, &msghdr->part4, sizeof(msghdr->part4));
2139-
21402141

21412142
/* Payload */
2142-
msghdr->part3.Crc =
2143-
crc32(msghdr->part3.Crc,
2144-
rkm->rkm_payload,
2145-
rkm->rkm_len);
2146-
rd_kafka_buf_push(rkbuf, rkm->rkm_payload, rkm->rkm_len);
2143+
if (rkm->rkm_payload) {
2144+
msghdr->part3.Crc =
2145+
crc32(msghdr->part3.Crc,
2146+
rkm->rkm_payload,
2147+
rkm->rkm_len);
2148+
rd_kafka_buf_push(rkbuf, rkm->rkm_payload,
2149+
rkm->rkm_len);
2150+
}
21472151

21482152

21492153
/* Finalize Crc */
@@ -2807,7 +2811,10 @@ static rd_kafka_resp_err_t rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
28072811
RD_KAFKAP_BYTES_LEN(Key);
28082812
}
28092813

2810-
rko->rko_rkmessage.payload = Value->data;
2814+
/* Forward NULL message notation to application. */
2815+
rko->rko_rkmessage.payload =
2816+
RD_KAFKAP_BYTES_IS_NULL(Value) ?
2817+
NULL : Value->data;
28112818
rko->rko_rkmessage.len = Value_len;
28122819

28132820
rko->rko_rkmessage.offset = hdr->Offset;

Diff for: src/rdkafka_msg.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm) {
4040
rd_kafka_assert(rk, rk->rk_producer.msg_cnt > 0);
4141
(void)rd_atomic_sub(&rk->rk_producer.msg_cnt, 1);
4242

43-
if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE)
43+
if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE && rkm->rkm_payload)
4444
free(rkm->rkm_payload);
4545

4646
if (rkm->rkm_key)
@@ -93,7 +93,7 @@ static rd_kafka_msg_t *rd_kafka_msg_new0 (rd_kafka_topic_t *rkt,
9393
rkt->rkt_conf.message_timeout_ms * 1000;
9494
}
9595

96-
if (msgflags & RD_KAFKA_MSG_F_COPY) {
96+
if (payload && msgflags & RD_KAFKA_MSG_F_COPY) {
9797
/* Copy payload to space following the ..msg_t */
9898
rkm->rkm_payload = (void *)(rkm+1);
9999
memcpy(rkm->rkm_payload, payload, len);

0 commit comments

Comments
 (0)