Skip to content

Commit

Permalink
feat(event): add handler.Workers() option
Browse files Browse the repository at this point in the history
refactor(handler.go): improve comments for better code understanding
feat(handler.go): add Startup function to handle events on startup
feat(handler.go): add Workers function to set number of workers for event processing
feat(handler.go): modify Run function to process events concurrently
feat(handler.go): add handleEvents function for concurrent event processing
feat(handler.go): modify startup function to handle startup events concurrently
refactor(handler.go): rename store to startupStore for better clarity
refactor(handler.go): rename handleStoredEvents to startup for better clarity
refactor(handler.go): deprecate WithStore function, use Startup instead
  • Loading branch information
bounoable committed Sep 20, 2023
1 parent 5f0ca91 commit bb69ef0
Showing 1 changed file with 113 additions and 62 deletions.
175 changes: 113 additions & 62 deletions event/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,48 +12,71 @@ import (
"github.com/modernice/goes/internal/concurrent"
)

// ErrRunning is the error returned when trying to run an event handler that is
// already running.
// ErrRunning is an error that indicates an event handler is already running. It
// is returned when attempting to run a handler that is currently active. This
// error serves as a guard against concurrent execution of the same event
// handler.
var ErrRunning = errors.New("event handler is already running")

// Handler is responsible for managing the registration and execution of event
// handlers in response to incoming events. It maintains a map of registered
// event handlers and listens to an event bus for events that match registered
// handler names. When such an event occurs, the corresponding handler function
// is invoked. Handler also provides support for querying stored events from an
// event store, and it can run concurrently, with its execution state being
// controlled through a provided context.
// Handler is a type that processes events from an event bus. It associates
// event names with specific functions, which are called whenever their
// respective event occurs. Handler uses multiple workers to process events
// concurrently. The number of workers can be customized through options when
// creating a new Handler instance. Events can be registered to the Handler, and
// it provides methods to check if it's currently processing events or if a
// certain event has a registered handler. Handlers also have a context which
// can be used for synchronization and cancellation of operations. Handlers
// prevent concurrent execution of the same instance to avoid race conditions.
type Handler struct {
bus event.Bus
store event.Store
bus event.Bus
startupStore event.Store
workers int

mux sync.RWMutex
handlers map[string]func(event.Event)
eventNames map[string]struct{}
ctx context.Context
}

// Option is a function that modifies the configuration of a [Handler]. It
// provides a way to set optional parameters when creating a new [Handler]. This
// approach is used to avoid constructor cluttering when there are many
// configurations possible for a [Handler].
// Option is a function type used to configure a [Handler]. It can be used to
// set various properties of the [Handler] such as the event store or the number
// of workers. Each option is applied in the order they are provided when
// constructing a new [Handler] using the New function.
type Option func(*Handler)

// WithStore is a function that returns an Option which, when applied to a
// Handler, sets the event.Store of that Handler. This is used to configure
// where the Handler stores events.
// Startup sets the startup event store for a [Handler]. This store is used to
// handle events when the [Handler] starts up. The Startup option is typically
// used to initialize the system with initial event handling on startup or
// implement a "catch-up" mechanism for their event handlers.
func Startup(store event.Store) Option {
return func(h *Handler) {
h.startupStore = store
}
}

// WithStore is an [Option] for a [Handler] that sets the event store to be
// used. This function returns an [Option] that, when used with the New
// function, configures a [Handler] to use the specified event store. This is
// typically used to specify where events should be stored when they are handled
// by the [Handler]. Note that WithStore is equivalent to the Startup function.
//
// Deprecated: Use Startup instead.
func WithStore(store event.Store) Option {
return Startup(store)
}

// Workers sets the number of workers that a [Handler] uses to process events.
// If this option is not used when constructing a new [Handler], the default
// number of workers is 1.
func Workers(n int) Option {
return func(h *Handler) {
h.store = store
h.workers = n
}
}

// New creates a new Handler with the provided event bus and applies the given
// options. The bus parameter is used to subscribe to events and distribute them
// to registered event handlers. Options can be used to configure the Handler,
// for example to specify an event store where past events are queried from on
// startup. The returned Handler is not running and must be started with the Run
// method.
// New creates a new event handler with the provided bus and options. It sets up
// an empty map for handlers and event names, applies the given options, and
// ensures that there is at least one worker. The new handler is returned.
func New(bus event.Bus, opts ...Option) *Handler {
h := &Handler{
bus: bus,
Expand All @@ -63,56 +86,57 @@ func New(bus event.Bus, opts ...Option) *Handler {
for _, opt := range opts {
opt(h)
}
if h.workers < 1 {
h.workers = 1
}
return h
}

// RegisterEventHandler registers an event handler function for the given event
// name. The event handler function, which takes an [event.Event] as a
// parameter, will be called whenever an event with the registered name occurs.
// The function is thread-safe and can be called concurrently.
// RegisterEventHandler associates the provided event name with a given function
// to handle that event. The function will be called whenever an event with the
// associated name occurs. This method is safe for concurrent use.
func (h *Handler) RegisterEventHandler(name string, fn func(event.Event)) {
h.mux.Lock()
defer h.mux.Unlock()
h.handlers[name] = fn
h.eventNames[name] = struct{}{}
}

// EventHandler retrieves the event handling function associated with the
// provided name. The function returned is responsible for processing an event
// of that name. If no handler is found for the given name, a nil function and
// false are returned.
// EventHandler retrieves the event handler function associated with the
// provided event name. If a handler for the given event name is found, it
// returns the handler function and true. If no handler is found, it returns nil
// and false. This method is safe for concurrent use.
func (h *Handler) EventHandler(name string) (func(event.Event), bool) {
h.mux.RLock()
defer h.mux.RUnlock()
fn, ok := h.handlers[name]
return fn, ok
}

// Context returns the context of the Handler. This context is set when the Run
// method is called and is used to control the lifecycle of the Handler. It's
// protected by a read-write lock, ensuring safe concurrent access.
// Context returns the context of the Handler. This context is used for
// synchronization and cancellation of operations within the Handler. If the
// Handler is not running, nil is returned.
func (h *Handler) Context() context.Context {
h.mux.RLock()
defer h.mux.RUnlock()
return h.ctx
}

// Running checks if the Handler is currently running. It returns true if the
// Handler has been started and not yet stopped, otherwise it returns false. The
// context of the Handler is used to determine its state. If the context is not
// nil, it indicates that the Handler is running.
// Running checks if the Handler is currently processing events. It returns true
// if the Handler is running and false otherwise. This method is safe for
// concurrent use.
func (h *Handler) Running() bool {
h.mux.RLock()
defer h.mux.RUnlock()
return h.ctx != nil
}

// Run starts the event handler with the provided context. It subscribes to all
// registered events on the event bus and begins handling incoming events. If
// the event handler has an event store, it also handles all stored events. If
// the handler is already running, it returns an error. The function returns a
// channel for errors that may occur during event handling and an error if there
// was an issue starting the handler.
// Run starts the event handling process for the [Handler]. It subscribes to the
// events that have been registered with the [Handler] and starts processing
// them concurrently with a number of worker goroutines. The errors from running
// the handlers are returned through a channel. If the [Handler] is already
// running when this method is called, it returns an error. The context passed
// to Run is used to control cancellation of the event handling process.
func (h *Handler) Run(ctx context.Context) (<-chan error, error) {
h.mux.Lock()
defer h.mux.Unlock()
Expand All @@ -133,43 +157,70 @@ func (h *Handler) Run(ctx context.Context) (<-chan error, error) {
return nil, fmt.Errorf("subscribe to events: %w [events=%v]", err, eventNames)
}

out, fail := concurrent.Errors(ctx)
queueError, fail := concurrent.Errors(ctx)
queue := make(chan event.Event)

go func() {
defer close(queue)
if err := streams.Walk(ctx, func(evt event.Event) error {
if fn, ok := h.EventHandler(evt.Name()); ok {
fn(evt)
select {
case <-ctx.Done():
return ctx.Err()
case queue <- evt:
return nil
}
return nil
}, events, errs); !errors.Is(err, context.Canceled) {
fail(err)
}
}()

if h.store != nil {
out := streams.FanInAll(queueError, h.handleEvents(ctx, queue))

if h.startupStore != nil {
go func() {
if err := h.handleStoredEvents(ctx, eventNames); err != nil {
fail(fmt.Errorf("handle events from store: %w", err))
if err := h.startup(ctx, eventNames); err != nil {
fail(fmt.Errorf("startup handler: %w", err))
}
}()
}

return out, nil
}

func (h *Handler) handleStoredEvents(ctx context.Context, eventNames []string) error {
str, errs, err := h.store.Query(ctx, query.New(
func (h *Handler) handleEvents(ctx context.Context, events <-chan event.Event) <-chan error {
errs, fail := concurrent.Errors(ctx)
var wg sync.WaitGroup
for i := 0; i < h.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for evt := range events {
fn, ok := h.EventHandler(evt.Name())
if !ok {
fail(fmt.Errorf("no handler for event %q", evt.Name()))
continue
}
fn(evt)
}
}()
}
return errs
}

func (h *Handler) startup(ctx context.Context, eventNames []string) error {
str, errs, err := h.startupStore.Query(ctx, query.New(
query.Name(eventNames...),
query.SortByTime(),
))
if err != nil {
return fmt.Errorf("query %s events: %w", eventNames, err)
return fmt.Errorf("query events %v: %w", eventNames, err)
}

return streams.Walk(ctx, func(evt event.Event) error {
if fn, ok := h.EventHandler(evt.Name()); ok {
fn(evt)
}
return nil
}, str, errs)
handlerErrors := h.handleEvents(ctx, str)

for err := range streams.FanInAll(errs, handlerErrors) {
return err
}

return nil
}

0 comments on commit bb69ef0

Please sign in to comment.