-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cursor persistence for consumer offset checkpointing #214
Comments
Thanks for the proposal. I'll need some time to think it through. A few questions that immediately come to mind:
|
Should cursors be associated with a stream, or with a partition of a stream? I thought partitions were the ones with a defined ordering. How long do cursors live for? It sounds like they never get deleted. I think that seems too long. What is Is there a way to guarantee that |
Good catch @spenczar. That is correct in my mind.
My thought would be to store them in a compacted stream, so only the newest cursor position is stored for each consumer. Beyond that, I'm not sure you can make any assumptions when they should be destroyed, other than perhaps in the case of a consumer group being deleted/closed? Curious what @Jmgr's thoughts are.
This should be solved if we rely on Liftbridge streams to store the cursors. |
Thanks for the feedback!
Yes, we were not planning to solve that in this issue.
One solution could be that
This could be used as such a primitive, yes. The idea for this proposal is to have a low-level API first.
That would be another option, yes. But then each client/consumer would have to provide some sort of id to be uniquely identifiable?
Yes, there could be a flag that automatically performs this truncation. This could also be part of another API, more related to compaction and retention.
Not really, but that looks like a good solution.
Yes, they should be associated with a steam partition. That would require adding a
That would make sense.
That would be one case. There could also be a stream flag that enables removing cursors when truncating.
That would also work.
I suppose that would be solved if we make |
Ah, I thought idempotency was already specified. This line from the original post seemed to ensure it:
I strongly feel that idempotency is required for this design. I assume that "latest" refers to "highest offset," and is not a reference to a wall-time clock. |
Yes, exactly. |
I propose we set aside the truncation piece to a separate proposal in order to focus this discussion and narrow the scope. IMO that is a separate concern. Specifically, I think |
Just an update on this: I am in the process of implementing automatic partition pausing (#218), which I consider to be a requisite for cursor persistence. |
Newcomer to this project and LOVE it! Shouldn't consumer offset be automatically handled via message ACK? When a consumer ACKs a message that consumer's offset should automatically be updated. For the case that messages are not ACKd, the the consumer's offset should be incremented upon delivery. This will greatly reduce the client complexity which is one of the benefits of this tech over something like kafka. |
Liftbridge doesn't have consumer acks. In that sense, it's the same type of model as Kafka—the server doesn't do any bookkeeping as such. I do believe client libraries should provide higher-level consumer functionality to checkpoint offsets (similar to Kafka's high-level consumer), but I'm open to hearing people's thoughts/ideas around this. |
Do you mean you would have one single internal stream to store the cursor of existing streams ? Let say, in order to define cursor, you need message SetCursorRequest {
string stream = 1; // Stream name
string cursorId = 2; // User-supplied value to uniquely identify this cursor
int64 offset = 3; // Offset where the cursor should be placed
int partition = 4; // Partition
}
// GetCursorRequest is sent to retrieve a stream cursor's offset.
message GetCursorRequest {
string stream = 1; // Stream name
string cursorId = 2; // User-supplied value to uniquely identify this cursor
int partition = 3; // Partition
} Then on the internal stream, let call it Would that make sense ? |
I guess that the cursor would be an |
@LaPetiteSouris Yes, that is roughly what I am thinking. I think there is also opportunity for the broker to cache cursors for faster lookups, perhaps with an LRU or some such, but the data would be backed by a partitioned stream. |
FYI, I'm planning to start working on this soon. |
Addressed in #268 |
This is a feature request and API proposal to allow cursor persistence for consumer offset checkpointing.
Use-case
Ably stores messages that are sent by customers. Those messages are published to a Liftbridge cluster for reliable storage and need to be processed by one or multiple consumers (processors). This processing needs to be performed on each message. Message consumers can be interrupted for various reasons, but in case of a crash a new instance of it (or an older instance continuing the work) must be able to continue processing at the position where the other consumer was interrupted. We need to have a way to reliably store per-consumer cursors to a position within a stream.
Rather than re-implementing a reliable storage system for cursors we would like to use Liftbridge to store a certain number of cursors per stream. Those cursors would then be replicated across the Liftbridge cluster and would be stored and retrieved by consumers. A cursor's offset should only be allowed to monotonically increase, so that if multiple consumers try to set the offset of the same cursor only the latest one is taken into account.
Processed message are not required anymore and can be truncated. We would also like to have a feature allowing to truncate all messages in a stream that are older than the earliest cursor, i.e. message that have been processed by all consumers.
API proposal
New RPCs to the
API
service:New messages:
Go API
New functions to the
Client
interface:Related to #46.
The text was updated successfully, but these errors were encountered: