-
Notifications
You must be signed in to change notification settings - Fork 659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added convenience method StoreMessage that calls StoreOffsets #676
Conversation
Hey @finncolman, @confluentinc It looks like this brave person signed our Contributor License Agreement. 👍 Always at your service, clabot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great stuff!
One small nit, maybe the smallest of them all.
A future performance improvement is to make a cgo implementation of StoreOffsets to cut it down to a single C call (instead of something like 4), since each C call has a large overhead in Go.
// StoreMessage stores offset based on the provided message. | ||
// This is a convenience method that uses StoreOffsets to do the actual work. | ||
func (c *Consumer) StoreMessage(m *Message) (storedOffsets []TopicPartition, err error) { | ||
if m.TopicPartition.Error != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should also make sure Offset is >= 0. You never know..
Thanks for your contribution! |
…olman, confluentinc#676) * Added convenience method StoreMessage that calls StoreOffsets * Added '.' at the end of the comment * Added Offset less than 0 check * Moved the check to the correct method Co-authored-by: Finn Colman <[email protected]>
If you have "enable.auto.commit" set to true, but "enable.auto.offset.store" set to false then you need to call StoreOffsets after the consumer processing has completed, but at this stage you only have the message that was fetched, not the offsets.
This means that you end up writing code like this all over the place:
if m.TopicPartition.Error != nil { return nil, newErrorFromString(ErrInvalidArg, "Can't store errored message") } offsets := []TopicPartition{m.TopicPartition} offsets[0].Offset++ return c.StoreOffsets(offsets)
This PR adds a convenience method StoreMessage that does this for you. This is similar to the CommitMessage method in that it simply gets the offset to store from the message.