Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimistic concurrency control #54

Closed
laszbalo opened this issue Feb 27, 2019 · 3 comments
Closed

Optimistic concurrency control #54

laszbalo opened this issue Feb 27, 2019 · 3 comments
Labels
enhancement roadmap Included on the current product roadmap

Comments

@laszbalo
Copy link

Hi,

is the above on the roadmap?

OffsetOfTheLastMessage := 10 // Get somehow the offset of the last message in a stream

// Create a message envelope to publish.
msg := lift.NewMessage([]byte("Hello, world!"), lift.MessageOptions{
    Key: []byte("foo"), // Key to set on the message
    AckInbox: 'asd453dadfa', // Some random subject name
    OffsetOfTheLastMessage: OffsetOfTheLastMessage, // Set the ID which will be sent on the ack
})

nc.Publish("foo.bar", msg)

The leader/master liftbridge process would accept the message if and only if the offset of the last message in the stream is equal to the value of the OffsetOfTheLastMessage field sent in the message options.

Also, there is an unresolved issue about the same thing on Kafka's issue tracker.

@tylertreat
Copy link
Member

Interesting, I had not considered it, but I could see a lot of use cases for a CAS-like operation. Can you describe your particular use case? I would love to hear what you'd like to use it for.

Currently I've been focused on implementing log compaction, but I want to keep this open since I think it's a good idea and would be fairly straightforward to implement.

@laszbalo
Copy link
Author

laszbalo commented Mar 4, 2019

Just trying to wrap my head around event sourcing and CQRS. Watched some youtube videos on the subjects and the above requirement came up a couple of times: 1, 2.

At my current stage it only effects the way I am planning to design my system:

  1. one stream per aggregate; aggregate is like a document in a MongoDB collection, but this time is stored as a list of events in its unique stream
  2. I have to apply the events to have an in-memory object representing the aggregate
  3. I can use this in-memory aggregate to apply commands on it

The above means that there can only be one single process inside the whole system which can build the aggregate and run commands on it without having to worry about concurrency.

For example I cannot have multiple command handlers running parallel, because e.g. the first command handler process might have appended an event to the stream, which has not been applied to the second process' in-memory aggregate yet. Therefore the second process' view of the aggregate is out of date.

@tylertreat
Copy link
Member

Implemented in #296.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement roadmap Included on the current product roadmap
Projects
None yet
Development

No branches or pull requests

2 participants