|
| 1 | +package aggregate |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "fmt" |
| 6 | + "sync" |
| 7 | + "time" |
| 8 | + |
| 9 | + "github.com/pkg/errors" |
| 10 | + "github.com/tochemey/ego/egopb" |
| 11 | + "github.com/tochemey/ego/store" |
| 12 | + "github.com/tochemey/goakt/actors" |
| 13 | + "go.uber.org/atomic" |
| 14 | + "google.golang.org/protobuf/proto" |
| 15 | + "google.golang.org/protobuf/types/known/anypb" |
| 16 | + "google.golang.org/protobuf/types/known/timestamppb" |
| 17 | +) |
| 18 | + |
| 19 | +type Command proto.Message |
| 20 | +type Event proto.Message |
| 21 | +type State proto.Message |
| 22 | + |
| 23 | +// Behavior defines an event sourced behavior when modeling a CQRS Behavior. |
| 24 | +type Behavior[T State] interface { |
| 25 | + // ID defines the id that will be used in the event journal. |
| 26 | + // This helps track the aggregate in the events store. |
| 27 | + ID() string |
| 28 | + // InitialState returns the event sourced actor initial state |
| 29 | + InitialState() T |
| 30 | + // HandleCommand helps handle commands received by the event sourced actor. The command handlers define how to handle each incoming command, |
| 31 | + // which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can |
| 32 | + // be returned as a no-op. Command handlers are the meat of the event sourced actor. |
| 33 | + // They encode the business rules of your event sourced actor and act as a guardian of the event sourced actor consistency. |
| 34 | + // The command eventSourcedHandler must first validate that the incoming command can be applied to the current model state. |
| 35 | + // Any decision should be solely based on the data passed in the commands and the state of the Behavior. |
| 36 | + // In case of successful validation, one or more events expressing the mutations are persisted. |
| 37 | + // Once the events are persisted, they are applied to the state producing a new valid state. |
| 38 | + HandleCommand(ctx context.Context, command Command, priorState T) (event Event, err error) |
| 39 | + // HandleEvent handle events emitted by the command handlers. The event handlers are used to mutate the state of the event sourced actor by applying the events to it. |
| 40 | + // Event handlers must be pure functions as they will be used when instantiating the event sourced actor and replaying the event journal. |
| 41 | + HandleEvent(ctx context.Context, event Event, priorState T) (state T, err error) |
| 42 | +} |
| 43 | + |
| 44 | +// Aggregate is an event sourced based actor |
| 45 | +type Aggregate[T State] struct { |
| 46 | + Behavior[T] |
| 47 | + // specifies the events store |
| 48 | + eventsStore store.EventsStore |
| 49 | + // specifies the current state |
| 50 | + currentState T |
| 51 | + |
| 52 | + eventsCounter *atomic.Uint64 |
| 53 | + lastCommandTime time.Time |
| 54 | + mu sync.RWMutex |
| 55 | +} |
| 56 | + |
| 57 | +// enforce compilation error |
| 58 | +var _ actors.Actor = &Aggregate[State]{} |
| 59 | + |
| 60 | +// New creates an instance of Aggregate provided the eventSourcedHandler and the events store |
| 61 | +func New[T State](behavior Behavior[T], eventsStore store.EventsStore) *Aggregate[T] { |
| 62 | + // create an instance of aggregate and return it |
| 63 | + return &Aggregate[T]{ |
| 64 | + eventsStore: eventsStore, |
| 65 | + Behavior: behavior, |
| 66 | + eventsCounter: atomic.NewUint64(0), |
| 67 | + mu: sync.RWMutex{}, |
| 68 | + } |
| 69 | +} |
| 70 | + |
| 71 | +// PreStart pre-starts the actor |
| 72 | +// At this stage we connect to the various stores |
| 73 | +func (a *Aggregate[T]) PreStart(ctx context.Context) error { |
| 74 | + // add a span context |
| 75 | + //ctx, span := telemetry.SpanContext(ctx, "PreStart") |
| 76 | + //defer span.End() |
| 77 | + // acquire the lock |
| 78 | + a.mu.Lock() |
| 79 | + // release lock when done |
| 80 | + defer a.mu.Unlock() |
| 81 | + |
| 82 | + // connect to the various stores |
| 83 | + if a.eventsStore == nil { |
| 84 | + return errors.New("events store is not defined") |
| 85 | + } |
| 86 | + |
| 87 | + // call the connect method of the journal store |
| 88 | + if err := a.eventsStore.Connect(ctx); err != nil { |
| 89 | + return fmt.Errorf("failed to connect to the events store: %v", err) |
| 90 | + } |
| 91 | + |
| 92 | + // check whether there is a snapshot to recover from |
| 93 | + if err := a.recoverFromSnapshot(ctx); err != nil { |
| 94 | + return errors.Wrap(err, "failed to recover from snapshot") |
| 95 | + } |
| 96 | + return nil |
| 97 | +} |
| 98 | + |
| 99 | +// Receive processes any message dropped into the actor mailbox. |
| 100 | +func (a *Aggregate[T]) Receive(ctx actors.ReceiveContext) { |
| 101 | + // grab the context |
| 102 | + goCtx := ctx.Context() |
| 103 | + // add a span context |
| 104 | + //goCtx, span := telemetry.SpanContext(ctx.Context(), "Receive") |
| 105 | + //defer span.End() |
| 106 | + |
| 107 | + // acquire the lock |
| 108 | + a.mu.Lock() |
| 109 | + // release lock when done |
| 110 | + defer a.mu.Unlock() |
| 111 | + |
| 112 | + // grab the command sent |
| 113 | + switch command := ctx.Message().(type) { |
| 114 | + case *egopb.GetStateCommand: |
| 115 | + // let us fetch the latest journal |
| 116 | + latestEvent, err := a.eventsStore.GetLatestEvent(goCtx, a.ID()) |
| 117 | + // handle the error |
| 118 | + if err != nil { |
| 119 | + // create a new error reply |
| 120 | + reply := &egopb.CommandReply{ |
| 121 | + Reply: &egopb.CommandReply_ErrorReply{ |
| 122 | + ErrorReply: &egopb.ErrorReply{ |
| 123 | + Message: err.Error(), |
| 124 | + }, |
| 125 | + }, |
| 126 | + } |
| 127 | + // send the response |
| 128 | + ctx.Response(reply) |
| 129 | + return |
| 130 | + } |
| 131 | + |
| 132 | + // reply with the state unmarshalled |
| 133 | + resultingState := latestEvent.GetResultingState() |
| 134 | + reply := &egopb.CommandReply{ |
| 135 | + Reply: &egopb.CommandReply_StateReply{ |
| 136 | + StateReply: &egopb.StateReply{ |
| 137 | + PersistenceId: a.ID(), |
| 138 | + State: resultingState, |
| 139 | + SequenceNumber: latestEvent.GetSequenceNumber(), |
| 140 | + Timestamp: latestEvent.GetTimestamp(), |
| 141 | + }, |
| 142 | + }, |
| 143 | + } |
| 144 | + |
| 145 | + // send the response |
| 146 | + ctx.Response(reply) |
| 147 | + default: |
| 148 | + // pass the received command to the command handler |
| 149 | + event, err := a.HandleCommand(goCtx, command, a.currentState) |
| 150 | + // handle the command handler error |
| 151 | + if err != nil { |
| 152 | + // create a new error reply |
| 153 | + reply := &egopb.CommandReply{ |
| 154 | + Reply: &egopb.CommandReply_ErrorReply{ |
| 155 | + ErrorReply: &egopb.ErrorReply{ |
| 156 | + Message: err.Error(), |
| 157 | + }, |
| 158 | + }, |
| 159 | + } |
| 160 | + // send the response |
| 161 | + ctx.Response(reply) |
| 162 | + return |
| 163 | + } |
| 164 | + |
| 165 | + // if the event is nil nothing is persisted, and we return no reply |
| 166 | + if event == nil { |
| 167 | + // create a new error reply |
| 168 | + reply := &egopb.CommandReply{ |
| 169 | + Reply: &egopb.CommandReply_NoReply{ |
| 170 | + NoReply: &egopb.NoReply{}, |
| 171 | + }, |
| 172 | + } |
| 173 | + // send the response |
| 174 | + ctx.Response(reply) |
| 175 | + return |
| 176 | + } |
| 177 | + |
| 178 | + // process the event by calling the event handler |
| 179 | + resultingState, err := a.HandleEvent(goCtx, event, a.currentState) |
| 180 | + // handle the event handler error |
| 181 | + if err != nil { |
| 182 | + // create a new error reply |
| 183 | + reply := &egopb.CommandReply{ |
| 184 | + Reply: &egopb.CommandReply_ErrorReply{ |
| 185 | + ErrorReply: &egopb.ErrorReply{ |
| 186 | + Message: err.Error(), |
| 187 | + }, |
| 188 | + }, |
| 189 | + } |
| 190 | + // send the response |
| 191 | + ctx.Response(reply) |
| 192 | + return |
| 193 | + } |
| 194 | + |
| 195 | + // increment the event counter |
| 196 | + a.eventsCounter.Inc() |
| 197 | + |
| 198 | + // set the current state for the next command |
| 199 | + a.currentState = resultingState |
| 200 | + |
| 201 | + // marshal the event and the resulting state |
| 202 | + marshaledEvent, _ := anypb.New(event) |
| 203 | + marshaledState, _ := anypb.New(resultingState) |
| 204 | + |
| 205 | + sequenceNumber := a.eventsCounter.Load() |
| 206 | + timestamp := timestamppb.Now() |
| 207 | + a.lastCommandTime = timestamp.AsTime() |
| 208 | + |
| 209 | + // create the event |
| 210 | + envelope := &egopb.Event{ |
| 211 | + PersistenceId: a.ID(), |
| 212 | + SequenceNumber: sequenceNumber, |
| 213 | + IsDeleted: false, |
| 214 | + Event: marshaledEvent, |
| 215 | + ResultingState: marshaledState, |
| 216 | + Timestamp: a.lastCommandTime.Unix(), |
| 217 | + } |
| 218 | + |
| 219 | + // create a journal list |
| 220 | + journals := []*egopb.Event{envelope} |
| 221 | + |
| 222 | + // TODO persist the event in batch using a child actor |
| 223 | + if err := a.eventsStore.WriteEvents(goCtx, journals); err != nil { |
| 224 | + // create a new error reply |
| 225 | + reply := &egopb.CommandReply{ |
| 226 | + Reply: &egopb.CommandReply_ErrorReply{ |
| 227 | + ErrorReply: &egopb.ErrorReply{ |
| 228 | + Message: err.Error(), |
| 229 | + }, |
| 230 | + }, |
| 231 | + } |
| 232 | + // send the response |
| 233 | + ctx.Response(reply) |
| 234 | + return |
| 235 | + } |
| 236 | + |
| 237 | + reply := &egopb.CommandReply{ |
| 238 | + Reply: &egopb.CommandReply_StateReply{ |
| 239 | + StateReply: &egopb.StateReply{ |
| 240 | + PersistenceId: a.ID(), |
| 241 | + State: marshaledState, |
| 242 | + SequenceNumber: sequenceNumber, |
| 243 | + Timestamp: a.lastCommandTime.Unix(), |
| 244 | + }, |
| 245 | + }, |
| 246 | + } |
| 247 | + |
| 248 | + // send the response |
| 249 | + ctx.Response(reply) |
| 250 | + } |
| 251 | +} |
| 252 | + |
| 253 | +// PostStop prepares the actor to gracefully shutdown |
| 254 | +func (a *Aggregate[T]) PostStop(ctx context.Context) error { |
| 255 | + // add a span context |
| 256 | + //ctx, span := telemetry.SpanContext(ctx, "PostStop") |
| 257 | + //defer span.End() |
| 258 | + |
| 259 | + // acquire the lock |
| 260 | + a.mu.Lock() |
| 261 | + // release lock when done |
| 262 | + defer a.mu.Unlock() |
| 263 | + |
| 264 | + // disconnect the journal |
| 265 | + if err := a.eventsStore.Disconnect(ctx); err != nil { |
| 266 | + return fmt.Errorf("failed to disconnect the events store: %v", err) |
| 267 | + } |
| 268 | + return nil |
| 269 | +} |
| 270 | + |
| 271 | +// recoverFromSnapshot reset the persistent actor to the latest snapshot in case there is one |
| 272 | +// this is vital when the aggregate actor is restarting. |
| 273 | +func (a *Aggregate[T]) recoverFromSnapshot(ctx context.Context) error { |
| 274 | + // add a span context |
| 275 | + //ctx, span := telemetry.SpanContext(ctx, "RecoverFromSnapshot") |
| 276 | + //defer span.End() |
| 277 | + |
| 278 | + // check whether there is a snapshot to recover from |
| 279 | + event, err := a.eventsStore.GetLatestEvent(ctx, a.ID()) |
| 280 | + // handle the error |
| 281 | + if err != nil { |
| 282 | + return errors.Wrap(err, "failed to recover the latest journal") |
| 283 | + } |
| 284 | + |
| 285 | + // we do have the latest state just recover from it |
| 286 | + if event != nil { |
| 287 | + // set the current state |
| 288 | + if err := event.GetResultingState().UnmarshalTo(a.currentState); err != nil { |
| 289 | + return errors.Wrap(err, "failed unmarshal the latest state") |
| 290 | + } |
| 291 | + |
| 292 | + // set the event counter |
| 293 | + a.eventsCounter.Store(event.GetSequenceNumber()) |
| 294 | + return nil |
| 295 | + } |
| 296 | + |
| 297 | + // in case there is no snapshot |
| 298 | + a.currentState = a.InitialState() |
| 299 | + return nil |
| 300 | +} |
0 commit comments