-
Notifications
You must be signed in to change notification settings - Fork 659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose the librdkafka stats as events in the go client #57
Conversation
kafka/integration_test.go
Outdated
if mt.msgcnt == int64(cap(mt.msgs)) { | ||
mt.t.Logf("***** Buffer full. not saving message: %v\n", e) | ||
} else { | ||
// mt.t.Logf("Saving message to mt.msgs[%d]: %v\n", mt.msgcnt, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove comment
kafka/integration_test.go
Outdated
} else { | ||
// mt.t.Logf("Saving message to mt.msgs[%d]: %v\n", mt.msgcnt, e) | ||
mt.msgs[mt.msgcnt] = e | ||
mt.msgcnt++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is msgcnt needed, isn't len(mt.msgs) sufficient?
kafka/integration_test.go
Outdated
} | ||
case PartitionEOF: | ||
break // silence | ||
case *Stats: | ||
mt.t.Logf("Stats: %v", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The JSON should be verified to be syntactically correct, if there is an easy way to do that with Go without requiring full-blow corresponding go types
kafka/integration_test.go
Outdated
mt.t.Fatalf("Consumer error: %v", e) | ||
//mt.t.Logf("Ignored event: %v\n", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove comment
kafka/integration_test.go
Outdated
default: | ||
// there are other unhandled events such as OffsetsCommitted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this meant as a warning or fixme, ..?
Maybe we should handle it instead.
kafka/integration_test.go
Outdated
@@ -252,7 +285,9 @@ func producerTest(t *testing.T, testname string, testmsgs []*testmsgType, pc pro | |||
} | |||
|
|||
mt := msgtrackerStart(t, len(testmsgs)) | |||
|
|||
if pc.withStats { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't this be set along with statistics.interval.ms at line ~276?
kafka/integration_test.go
Outdated
|
||
// verify that stats events are received | ||
if pc.withStats { | ||
if mt.statscnt == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
combine into one if statement
kafka/integration_test.go
Outdated
@@ -351,7 +403,9 @@ func consumerTest(t *testing.T, testname string, msgcnt int, cc consumerCtrl, co | |||
|
|||
expCnt := msgcnt | |||
mt := msgtrackerStart(t, expCnt) | |||
|
|||
if cc.withStats { | |||
mt.withStats = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dito
kafka/integration_test.go
Outdated
if cc.withStats { | ||
//wait for a while for stats to show up | ||
for i := 0; i < 3; i++ { | ||
if mt.statscnt != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this thread-safe? mt.statscnt is updated through another go-routine, right?
I suggest changing this to a channel instead to signal completion.
E.g., let the stats receiving go-routine count the number of stats and send a "done" message on a channel when it is done, which is waited on here (with a reasonable timeout).
kafka/integration_test.go
Outdated
@@ -552,6 +622,18 @@ func TestProducerFuncDR(t *testing.T) { | |||
}) | |||
} | |||
|
|||
// test producer function-based API with stats and delivery report |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As with librdkafka I'd like to see a unittest for the stats that doesnt rely on broker connectivity.
This looks great! Just a couple of things needs sorting out |
@hqin Can you rebase and squash/fixup your commits to a single one? Something like:
|
kafka/stats_event_test.go
Outdated
|
||
// test if the stats string can be decoded into JSON | ||
var raw map[string]interface{} | ||
json.Unmarshal([]byte(e.String()), &raw) // convert string to json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should check for err return here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
kafka/stats_event_test.go
Outdated
Value: []byte("Own drChan"), Key: []byte("This is my key")} | ||
|
||
} | ||
p.Flush(2000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since no brokers are configured and the default message.timeout.ms is 5 minutes this flush(2s) won't do anything.
I suggest removing it and just relying on the timeout below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed p.Flush()
kafka/stats_event_test.go
Outdated
func testProducerFunc(t *testing.T, withProducerChannel bool) { | ||
|
||
p, err := NewProducer(&ConfigMap{ | ||
"statistics.interval.ms": 500, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason this is so "high"? 50 ms should suffice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to 50ms
0e63fb1
to
844faf0
Compare
LGTM. |
t.Fatalf("json unmarshall error: %s", err) | ||
} | ||
t.Logf("Stats['name']: %s", raw["name"]) | ||
close(statsReceived) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edenhill If the eventCh
has more than one Stats
messages, then statsReceived
would be closed twice, which should panic the program. I wonder how this actually can work ? In my understanding, golang channel cannot be closed twice.
@edenhill Please review the changes along with the librdkafka PR confluentinc/librdkafka#1171 ("added support for stats as events")