Skip to content

Commit

Permalink
Fix Producer Value&Key slice referencing to avoid cgo pointer checkin…
Browse files Browse the repository at this point in the history
…g failures (#24)
  • Loading branch information
edenhill committed Aug 8, 2017
1 parent e7666ea commit 041b696
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 19 deletions.
66 changes: 47 additions & 19 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import (
rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
rd_kafka_topic_t *rkt, int32_t partition,
int msgflags,
void *val, size_t val_len, void *key, size_t key_len,
int valIsNull, void *val, size_t val_len,
int keyIsNull, void *key, size_t key_len,
int64_t timestamp,
uintptr_t cgoid) {
Expand All @@ -39,15 +40,17 @@ rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
RD_KAFKA_V_RKT(rkt),
RD_KAFKA_V_PARTITION(partition),
RD_KAFKA_V_MSGFLAGS(msgflags),
RD_KAFKA_V_VALUE(val, val_len),
RD_KAFKA_V_KEY(key, key_len),
RD_KAFKA_V_VALUE(valIsNull ? NULL : val, val_len),
RD_KAFKA_V_KEY(keyIsNull ? NULL : key, key_len),
RD_KAFKA_V_TIMESTAMP(timestamp),
RD_KAFKA_V_OPAQUE((void *)cgoid),
RD_KAFKA_V_END);
#else
if (timestamp)
return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
if (rd_kafka_produce(rkt, partition, msgflags, val, val_len, key, key_len,
if (rd_kafka_produce(rkt, partition, msgflags,
valIsNull ? NULL : val, val_len,
keyIsNull ? NULL : key, key_len,
(void *)cgoid) == -1)
return rd_kafka_last_error();
else
Expand Down Expand Up @@ -84,27 +87,52 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event)

crkt := p.handle.getRkt(*msg.TopicPartition.Topic)

var valp *byte
var keyp *byte
var empty byte
valLen := 0
keyLen := 0

if msg.Value != nil {
// Three problems:
// 1) There's a difference between an empty Value or Key (length 0, proper pointer) and
// a null Value or Key (length 0, null pointer).
// 2) we need to be able to send a null Value or Key, but the unsafe.Pointer(&slice[0])
// dereference can't be performed on a nil slice.
// 3) cgo's pointer checking requires the unsafe.Pointer(slice..) call to be made
// in the call to the C function.
//
// Solution:
// Keep track of whether the Value or Key were nil (1), but let the valp and keyp pointers
// point to a 1-byte slice (but the length to send is still 0) so that the dereference (2)
// works.
// Then perform the unsafe.Pointer() on the valp and keyp pointers (which now either point
// to the original msg.Value and msg.Key or to the 1-byte slices) in the call to C (3).
//
var valp []byte
var keyp []byte
oneByte := []byte{0}
var valIsNull C.int = 0
var keyIsNull C.int = 0
var valLen int
var keyLen int

if msg.Value == nil {
valIsNull = 1
valLen = 0
valp = oneByte
} else {
valLen = len(msg.Value)
// allow sending 0-length messages (as opposed to null messages)
if valLen > 0 {
valp = &msg.Value[0]
valp = msg.Value
} else {
valp = &empty
valp = oneByte
}
}
if msg.Key != nil {

if msg.Key == nil {
keyIsNull = 1
keyLen = 0
keyp = oneByte
} else {
keyLen = len(msg.Key)
if keyLen > 0 {
keyp = &msg.Key[0]
keyp = msg.Key
} else {
keyp = &empty
keyp = oneByte
}
}

Expand All @@ -128,8 +156,8 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event)
cErr := C.do_produce(p.handle.rk, crkt,
C.int32_t(msg.TopicPartition.Partition),
C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY,
unsafe.Pointer(valp), C.size_t(valLen),
unsafe.Pointer(keyp), C.size_t(keyLen),
valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen),
keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen),
C.int64_t(timestamp),
(C.uintptr_t)(cgoid))
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
Expand Down
34 changes: 34 additions & 0 deletions kafka/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kafka

import (
"encoding/json"
"testing"
"time"
)
Expand Down Expand Up @@ -149,3 +150,36 @@ func TestProducerAPIs(t *testing.T) {
t.Errorf("OffsetsForTimes() failed but returned non-nil Offsets: %s\n", offsets)
}
}

// TestProducerBufferSafety verifies issue #24, passing any type of memory backed buffer
// (JSON in this case) to Produce()
func TestProducerBufferSafety(t *testing.T) {

p, err := NewProducer(&ConfigMap{
"socket.timeout.ms": 10,
"default.topic.config": ConfigMap{"message.timeout.ms": 10}})
if err != nil {
t.Fatalf("%s", err)
}

topic := "gotest"
value, _ := json.Marshal(struct{ M string }{M: "Hello Go!"})
empty := []byte("")

// Try combinations of Value and Key: json value, empty, nil
p.Produce(&Message{TopicPartition: TopicPartition{Topic: &topic}, Value: value, Key: nil}, nil)
p.Produce(&Message{TopicPartition: TopicPartition{Topic: &topic}, Value: value, Key: value}, nil)
p.Produce(&Message{TopicPartition: TopicPartition{Topic: &topic}, Value: nil, Key: value}, nil)
p.Produce(&Message{TopicPartition: TopicPartition{Topic: &topic}, Value: nil, Key: nil}, nil)

p.Produce(&Message{TopicPartition: TopicPartition{Topic: &topic}, Value: empty, Key: nil}, nil)
p.Produce(&Message{TopicPartition: TopicPartition{Topic: &topic}, Value: empty, Key: empty}, nil)
p.Produce(&Message{TopicPartition: TopicPartition{Topic: &topic}, Value: nil, Key: empty}, nil)
p.Produce(&Message{TopicPartition: TopicPartition{Topic: &topic}, Value: value, Key: empty}, nil)
p.Produce(&Message{TopicPartition: TopicPartition{Topic: &topic}, Value: value, Key: value}, nil)
p.Produce(&Message{TopicPartition: TopicPartition{Topic: &topic}, Value: empty, Key: value}, nil)

p.Flush(100)

p.Close()
}

0 comments on commit 041b696

Please sign in to comment.