Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate rehydration (retrieving entire stream history) #280

Closed
alexrudd opened this issue Oct 24, 2020 · 17 comments
Closed

Aggregate rehydration (retrieving entire stream history) #280

alexrudd opened this issue Oct 24, 2020 · 17 comments

Comments

@alexrudd
Copy link
Contributor

Hey,

I'm experimenting with event sourcing and using LiftBridge as an event store. For this I need to be able to retrieve all events for a particular aggregate instance, and then replay those events to get the most recent aggregate state. Once the aggregate is "rehydrated", it's ready to run a command, and the resulting event can be published back to the stream.

This is the same use case as mentioned here: #54 (comment)

I've gotten this working thanks to the recent fetch partition metadata rpc, but it seems more difficult than it ought to be. The calls I'm making to fetch all events that may exist for a stream (which could be zero) are:

  1. CreateStream - ensure the stream actually exists, continuing on ErrStreamExists
  2. FetchPartitionMetadata - so I have access to NewestOffset and can then tell when I've reached the end of the stream
  3. Subscribe - reading messages from the stream until I see a message with an offset that matches NewestOffset

(This doesn't account for events that might arrive between the FetchPartitionMetadata call and the Subscribe call, but that would be solved by the proposed optimistic concurrency support.)

I'm opening this issue to check if what I'm doing here makes sense? And see if there's support for adding a dedicated RPC to synchronously retrieve all or a chunk of a stream?

@alexrudd
Copy link
Contributor Author

Here's the basic interface I'm trying to implement using Liftbridge:

// EventStore persists streams of aggregate events.
type EventStore interface {
	GetStream(ctx context.Context, id string) (Stream, error)
}

// Stream contains all events published by an aggregate instance.
type Stream interface {
	Events() []proto.Message
	Publish(ctx context.Context, event proto.Message) error
}

The obvious problem comes when streams get too big. But that should be solvable using snapshots and then only fetching events since the latest snapshot's offset.

@LaPetiteSouris
Copy link
Contributor

And see if there's support for adding a dedicated RPC to synchronously retrieve all or a chunk of a stream?

If I understand your point correctly, it is a dedicated RPC to subscribe until a specific offset you want ?

E.g

// SubscribeRequest
Subscribe(req *client.SubscribeRequest, out client.API_SubscribeServer) error 

And for SubscribeRequest you may want things like

// SubscribeRequest is sent to subscribe to a stream partition.
message SubscribeRequest {
    int64         endOffset    = 4 [jstype=JS_STRING]; // Offset to end consuming from
}

And the value of the endOffset here in the case must be set to NewestOffset ?

Because by saying "all" of a stream, we mean to the end of the stream ? (NewestOffset) ?

Do I get your point correctly ?

@alexrudd
Copy link
Contributor Author

alexrudd commented Oct 25, 2020

If I understand your point correctly, it is a dedicated RPC to subscribe until a specific offset you want ?

Yeah, ideally I'd be able to set a "start" and an "end" offset, where for each of those values I might just want to say "the earliest" and "the latest" rather than specifying an actual offset.

This may just be possible using the existing Subscribe rpc by adding some new SubscribeRequest fields:

// SubscribeRequest is sent to subscribe to a stream partition.
message SubscribeRequest {
    // existing fields 1 - 7...
    StopPosition  stopPosition  = 8; // Where to stop consuming
    int64         stopOffset    = 9 [jstype=JS_STRING]; // Offset to stop consuming at
    int64         stopTimestamp = 10 [jstype=JS_STRING]; // Timestamp to stop consuming at
}

// StopPosition determines the stop-position type on a subscription.
enum StopPosition {
    OFFSET      = 0; // Stop at a specified offset
    LATEST      = 1; // Stop at the newest message
    TIMESTAMP   = 2; // Stop at a specified timestamp
}

Then in my code I could do something like this:

messages := []*lift.Message
wg := sync.WaitGroup{}
wg.Add(1)

client.Subscribe(ctx, "myStream", func(msg *lift.Message, err error) {
    if err != nil {
        if err == lift.ErrStopPositionReached {
            wg.Done()
            return
        }
        return
    }

    messages = append(messages, msg)
}, lift.StartAtEarliestReceived(), lift.StopAtLatestReceived())

// wait until end position reached
wg.Wait()

process(messages)

@LaPetiteSouris
Copy link
Contributor

LaPetiteSouris commented Oct 25, 2020

// StopPosition determines the stop-position type on a subscription.
enum StopPosition {
    OFFSET      = 0; // Stop at a specified offset
    LATEST      = 1; // Stop at the newest message
    TIMESTAMP   = 2; // Stop at a specified timestamp
}

How should the latest message be defined in that case ? I guess the concept is that the stream is unbounded, so Subscribe would keep hanging on for new messages till timeout. May be the only achievable way to make it "synchronous" is to explicitly give it an offset to stop.

That offset can be obtained from FetchPartitionMetadata before subscription.

I'm no expert in Event Sourcing, but what you describe looks more like a "batch" than a stream ? Are you doing that just in case of failure recovery or on first time bootstrap of the system ?

Would it make sense to publish one aggregated message upon every received event even in the replay (rehydration) ?

@alexrudd
Copy link
Contributor Author

How should the latest message be defined in that case ?

This would be the latest offset of the partition at the moment the subscribe request is received. The same value as if you were to have called FetchPartitionMetadata and looked at NewestOffset in that same moment. It's fine if that "latest offset" value is out of date by the time the client receives all messages, as it's expected this will be a rare occurrence that is protected against via optimistic concurrency when publishing back to the stream.

Just so we're on the same page with Event Sourcing, here's a definition:

Event sourcing (ES) describes a style of architecture wherein each state altering operation in an application is captured as an immutable event [1]. These events are appended to an event log. [...] The current state of an application is derived from this series of events. When replayed, this series of events always leads to the exact same state of the application.

How I'm imaging this working in Liftbridge is that each aggregate instance within an application would have its own Liftbridge stream that it publishes events to. When a command is received for a particular aggregate instance, the application must first "rehydrate" that aggregate before executing the command. Rehydration involves fetching all events from the instance's stream and replaying them in-order until the current state is reached. Once the aggregate is rehydrated to the current state the newly received command can be executed, and the subsequent event published back onto the instance's stream. With the command executed and event published, the aggregate instance can be discarded from memory.

This would result in a great number of small streams that are published to infrequently.

@tylertreat
Copy link
Member

I think this can be done using a "stop offset" on subscribe as you indicate. In fact, this is something I've considered in the past to help simplify consumer logic but have just never got to it.

I like the idea and symmetry of having a StopPosition field similar to the StartPosition. In addition to supporting the latest offset, we might want to support the high watermark since this represents the newest committed message. I think this is what most clients would actually want to consume up to, but I'm not positive.

@alexrudd
Copy link
Contributor Author

If you're both okay with the proposed proto above, then I can open a draft PR to liftbridge-api and then look at implementing this in the server and clients

@tylertreat
Copy link
Member

I'm good with the proposed proto changes. The only piece I'm not sure about is the behavior of StopPosition_LATEST. Should it consume up to the latest committed offset (i.e. high watermark) or the latest offset, whether it's committed or not? If it's uncommitted, the consumer would block until it's committed.

       +---+---+---+---+---+---+
offset | 0 | 1 | 2 | 3 | 4 | 5 |
       +---+---+---+---+---+---+
                         ^   ^
                         |.  |
            high watermark   |
        (latest committed)   |
                             |
                       latest offset
                       (uncommitted)

After thinking more about it, I'm hesitant to expose both as options to the user mainly because it adds a lot of cognitive load to the API, but I'm not totally sure what the "right" choice is. Consuming up to the high watermark would guarantee the consumer does not wait for messages but it might miss yet-to-be-committed messages. Consuming up to the latest offset might result in the consumer waiting for messages (which could be prolonged in pathological cases) but would ensure the consumer reads all messages in the log at the time of subscribe. That said, because new messages may be getting added to the log continually, I'm not sure if this matters. However, if the idea is to use #54 to publish new messages, then the latter option might be the correct choice after all.

Does that make sense @alexrudd?

@alexrudd
Copy link
Contributor Author

Yeah I understand your concerns. I'd say that if the stream is being used as the source of truth for what happens in a system (as is the case with event sourcing), then an event hasn't truly happened until it's been committed to the log. In which case I'd say StopPosition_LATEST should be the stream's high watermark. Though that's assuming that messages don't get published to NATS until they're committed?

I think actually that it would be fine either way. If StopPosition_LATEST == latest offset, then that leaves room for adding StopPosition_HIGHWATERMARK in the future, if that option becomes necessary and you're will to accept the usability costs.

@LaPetiteSouris
Copy link
Contributor

I guess that the first logical step would be to add support for the endOffset or stopOffset first. Because once it is in place, it is easier to reason about, as the new rpc method to retrieve entire history would be no more than an "abstraction layer" on top.

I agree with the above opinion, saying that expose HighWatermark only is not sufficient if we want to retrieve entire stream and then do a write. Because if that is the case, and presuming that Optimistic Concurrency Control is in place, that write will never be accepted anyway. Thus, IMO, I think NewestOffset makes more sense. In the future, once stopOffset support is already there, and there is a real need for adding a new method to retrieve all stream until HighWatermark, then it can be added later.

@tylertreat
Copy link
Member

Though that's assuming that messages don't get published to NATS until they're committed?

@alexrudd I'm not sure I understand what you're asking here. Are you referring to message acks?

I think actually that it would be fine either way. If StopPosition_LATEST == latest offset, then that leaves room for adding StopPosition_HIGHWATERMARK in the future, if that option becomes necessary and you're will to accept the usability costs.

I think I'm leaning towards this for the same reason @LaPetiteSouris said above.

@alexrudd
Copy link
Contributor Author

alexrudd commented Oct 27, 2020

I guess that the first logical step would be to add support for the endOffset or stopOffset first.

Have opened a small PR for this #282

@alexrudd I'm not sure I understand what you're asking here. Are you referring to message acks?

I probably just need to read the docs but my assumption is that a publish to Liftbridge results in.

  1. Message added to partition
  2. Message committed to log
  3. Message published to the underlying NATS server

My question was whether the publish to NATS happened before or after the commit to log

@tylertreat
Copy link
Member

Liftbridge streams actually consume from NATS subjects. The flow is basically this:

client publish -> Liftbridge publish -> NATS -> Liftbridge stream -> replication -> commit -> ack

Alternatively, you can also publish directly to the NATS subject (this allows Liftbridge to record any plain old NATS subject):

client publish -> NATS -> Liftbridge stream -> replication -> commit -> ack

Messages are committed once replication completes. Acks are sent via NATS and proxied by Liftbridge if using the publish APIs.

Will take a look at the PR in a bit.

@alexrudd
Copy link
Contributor Author

Liftbridge streams actually consume from NATS subjects. The flow is basically this:

client publish -> Liftbridge publish -> NATS -> Liftbridge stream -> replication -> commit -> ack

Was just thinking about this in regards to optimistic concurrency (#54). I guess this means that messages would be published to NATS regardless of whether they pass an optimistic concurrency check?

@LaPetiteSouris
Copy link
Contributor

Liftbridge streams actually consume from NATS subjects. The flow is basically this:

client publish -> Liftbridge publish -> NATS -> Liftbridge stream -> replication -> commit -> ack

Was just thinking about this in regards to optimistic concurrency (#54). I guess this means that messages would be published to NATS regardless of whether they pass an optimistic concurrency check?

It is still under discussion, but very likely that would be the case as of now it seems we don't have many choices.

Refer here FYI: liftbridge-io/liftbridge-api#46

@tylertreat
Copy link
Member

I guess this means that messages would be published to NATS regardless of whether they pass an optimistic concurrency check?

That is right. The optimistic concurrency check would apply to the Liftbridge log, not the NATS subject. If you wanted that check to apply to a NATS subject, you could put a Liftbridge stream in front of the NATS subject:

publisher -> NATS (Liftbridge stream subject) -> Liftbridge stream (w/ OCC) -> consumer -> NATS (OCC subject)

I suppose an interesting point for discussion is whether it would be useful to have the ability for Liftbridge to "forward" committed stream messages to a separate NATS subject. I have not put any thought into that though.

@alexrudd
Copy link
Contributor Author

If you wanted that check to apply to a NATS subject, you could put a Liftbridge stream in front of the NATS subject:

This is an interesting idea. I'll keep an eye on the OCC work as that's pretty exciting.

Now that #286 is merged, happy for this to be closed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants