-
Notifications
You must be signed in to change notification settings - Fork 659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Offset metadata #353
Offset metadata #353
Conversation
It looks like @damour hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
[clabot:check] |
It looks like @damour hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
[clabot:check] |
@confluentinc It looks like @damour just signed our Contributor License Agreement. 👍 Always at your service, clabot |
|
||
_, err = c.CommitOffsets([]kafka.TopicPartition{{ | ||
Topic: &topic, | ||
Partition: 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should use the provided partition
// when using localhost brokers on OSX, since the OSX resolver | ||
// will return the IPv6 addresses first. | ||
// You typically don't need to specify this configuration property. | ||
"broker.address.family": "v4", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this
"group.id": group, | ||
"session.timeout.ms": 6000, | ||
"enable.auto.commit": "false", | ||
"auto.offset.reset": "earliest"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for the last three since you're not actually consuming or subscribing to anything.
@@ -0,0 +1,103 @@ | |||
// Example function-based high-level Apache Kafka consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This description is incorrect
* limitations under the License. | ||
*/ | ||
|
||
// consumer_offset_metadata implements a consumer that commit offset with metadata that represents the state of the partition consumer at that point in time. The |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try to keep lines < 80 columns
package main | ||
|
||
/** | ||
* Copyright 2016 Confluent Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2019
func main() { | ||
|
||
if len(os.Args) < 6 { | ||
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topic> <partition> <offset> \"<metadata>\"\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about having two modes:
- if offset and metadata specified: commit and show committed offset
- if offset and metadata not specified: just show the committed offset
I think that would make it even more useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea! Done, PTAL.
kafka/kafka.go
Outdated
} | ||
|
||
return cparts | ||
} | ||
|
||
// Go string to C string without null character. | ||
func convertToCStringNoNull(s string) unsafe.Pointer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use C.CString()
instead of adding this function, the extra nul byte doesn't matter, just make sure metadata_size does not include it.
Thank you! |
Added ability to commit offset with metadata that represents the state of the partition consumer at that point in time. The metadata string can be used by another consumer to restore that state, so it can resume consumption.