-
Notifications
You must be signed in to change notification settings - Fork 662
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
memory leaked and error silenced with go.batch.producer set to true and big messages sent to produceChannel #1332
Comments
this problem trobles me a lot and i'm so sad there are some bugs with this repo but nobody come out to help us. |
about two days later, oom again |
nobody comes out to help us. so sad. |
Dear maintainer, /* Create message */
rkm = rd_kafka_msg_new0(
rkt,
(msgflags & RD_KAFKA_MSG_F_PARTITION)
? rkmessages[i].partition
: partition,
msgflags, rkmessages[i].payload, rkmessages[i].len,
rkmessages[i].key, rkmessages[i].key_len,
rkmessages[i]._private, &rkmessages[i].err, NULL, NULL,
utc_now, now);
if (unlikely(!rkm)) {
if (rkmessages[i].err == RD_KAFKA_RESP_ERR__QUEUE_FULL)
all_err = rkmessages[i].err;
continue;
} When a message payload exceeds the configured maximum size (in my setup, it was 1MB), if (unlikely(len > INT32_MAX || keylen > INT32_MAX ||
rd_kafka_msg_max_wire_size(keylen, len, hdrs_size) >
(size_t)rkt->rkt_rk->rk_conf.max_msg_size)) {
*errp = RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
if (errnop)
*errnop = EMSGSIZE;
return NULL;
} Even though the golang wrapper unc (p *Producer) produceBatch(topic string, msgs []*Message, msgFlags int) error {
crkt := p.handle.getRkt(topic)
cmsgs := make([]C.rd_kafka_message_t, len(msgs))
for i, m := range msgs {
p.handle.messageToC(m, &cmsgs[i])
}
r := C.rd_kafka_produce_batch(crkt, C.RD_KAFKA_PARTITION_UA, C.int(msgFlags)|C.RD_KAFKA_MSG_F_FREE,
(*C.rd_kafka_message_t)(&cmsgs[0]), C.int(len(msgs)))
if r == -1 {
return newError(C.rd_kafka_last_error())
}
return nil
} The following patch to to diff --git a/src/rdkafka_msg.c b/src/rdkafka_msg.c
index 3fc3967c..00f47879 100644
--- a/src/rdkafka_msg.c
+++ b/src/rdkafka_msg.c
@@ -757,6 +757,9 @@ int rd_kafka_produce_batch(rd_kafka_topic_t *app_rkt,
if (unlikely(!rkm)) {
if (rkmessages[i].err == RD_KAFKA_RESP_ERR__QUEUE_FULL)
all_err = rkmessages[i].err;
+ if (msgflags & RD_KAFKA_MSG_F_FREE && rkmessages[i].payload) {
+ rd_free(rkmessages[i].payload);
+ }
continue;
} |
@ucanme I hope this issue provides some hints for the problem you encountered. However, it's important to note that this issue only reports one possible path of memory leakage, and not all memory-related problems are connected to what's reported here. |
@jizhilong i guess i met the same problem as you, sometimes big messages more 1M may be sent. |
@jizhilong |
Description
We have a kafka producer built on confluent-kafka-go, to improve the throughput, we set
go.batch.producer
to true, and send messages toproducer.ProduceChannel
.Everything works find, except the process memory(RSS) keeps growing slowly, and leads to a OOM kill in the end.
After some digging, we suspect and confirmed this issue being related to the batch mode.
How to reproduce
Start a producer process with the following snippet:
go run
this snnipet, watch its output, monitor the process's cpu/memory usage with top. Two abnormal scenes were observed:If we comment out the line
conf.SetKey("go.batch.producer", true)
, things go back to normal:Checklist
Please provide the following information:
LibraryVersion()
): v2.6.0{"bootstrap.servers": "192.168.1.99:9092", "go.batch.producer": true}
"debug": ".."
as necessary)The text was updated successfully, but these errors were encountered: