Skip to content

Commit

Permalink
Reject delivery.report.only.error=true on producer creation (#306)
Browse files Browse the repository at this point in the history
.. to avoid a cgoDr memory leak per successful message.
  • Loading branch information
edenhill committed Mar 23, 2019
1 parent 335b347 commit ae2af5e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
10 changes: 9 additions & 1 deletion kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,15 @@ func NewProducer(conf *ConfigMap) (*Producer, error) {
// the original is not mutated.
confCopy := conf.clone()

v, err := confCopy.extract("go.batch.producer", false)
v, err := confCopy.extract("delivery.report.only.error", false)
if v == true {
// FIXME: The filtering of successful DRs must be done in
// the Go client to avoid cgoDr memory leaks.
return nil, newErrorFromString(ErrUnsupportedFeature,
"delivery.report.only.error=true is not currently supported by the Go client")
}

v, err = confCopy.extract("go.batch.producer", false)
if err != nil {
return nil, err
}
Expand Down
11 changes: 11 additions & 0 deletions kafka/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,14 @@ func TestProducerBufferSafety(t *testing.T) {

p.Close()
}

// TestProducerInvalidConfig verifies that invalid configuration is handled correctly.
func TestProducerInvalidConfig(t *testing.T) {

_, err := NewProducer(&ConfigMap{
"delivery.report.only.error": true,
})
if err == nil {
t.Fatalf("Expected NewProducer() to fail with delivery.report.only.error set")
}
}

0 comments on commit ae2af5e

Please sign in to comment.