-
Notifications
You must be signed in to change notification settings - Fork 659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Consumer.OffsetsStore() #72
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,7 +54,6 @@ func TestConsumerAPIs(t *testing.T) { | |
if err != nil { | ||
t.Errorf("SubscribeTopics failed: %s", err) | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove whitespace diff |
||
_, err = c.Commit() | ||
if err != nil && err.(Error).Code() != ErrNoOffset { | ||
t.Errorf("Commit() failed: %s", err) | ||
|
@@ -65,6 +64,20 @@ func TestConsumerAPIs(t *testing.T) { | |
t.Errorf("Unsubscribe failed: %s", err) | ||
} | ||
|
||
topic := "gotest" | ||
stored, err := c.OffsetsStore([]TopicPartition{{Topic: &topic, Partition: 0, Offset: 1}}) | ||
if err != nil && err.(Error).Code() != ErrUnknownPartition { | ||
t.Errorf("OffsetsStore() failed: %s", err) | ||
toppar := stored[0] | ||
if toppar.Error.(Error).Code() == ErrUnknownPartition { | ||
t.Errorf("OffsetsStore() TopicPartition error: %s", toppar.Error) | ||
} | ||
} | ||
stored, err = c.OffsetsStore(nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a use for accepting a nil partition set? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, the data type allows it, but is there a point for the StoreOffsets API to allow it? Passing a nil slice should be considered a usage error and not something we need to explicitly handle (I do agree that error handling is necessary, but parameter checking in APIs is not as fruitful as it seems IMHO unless there is a security concern to it). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's call it an empty slice. I think that the API should allow passing an empty slice, whatever that means in Go. If someone chooses to pass an empty slice no error is returned. We don't explicitly handle it (as in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note that this part of the diff is now changed to: var empty []TopicPartition
stored, err = c.StoreOffsets(empty) which seems like a valid usage scenario. |
||
if err != nil { | ||
t.Errorf("OffsetsStore(nil) failed: %s", err) | ||
} | ||
|
||
topic1 := "gotest1" | ||
topic2 := "gotest2" | ||
err = c.Assign([]TopicPartition{{Topic: &topic1, Partition: 2}, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably be called StoreOffsets to be consistent with CommitOffsets (even though it makes it inconsistent with librdkafka, but that's a hidden gem).