Skip to content

Commit

Permalink
refactor(handler.go): improve comments for better code understanding
Browse files Browse the repository at this point in the history
- Update comments for ErrRunning, Handler, Option, WithStore, New, RegisterEventHandler, EventHandler, Context, Running, and Run functions to provide more detailed and clear explanations.
- This will help developers understand the code better and make it easier for them to work with the event handler.

fix(stream.go): update Filter function comment for better clarity
- Change "pass all the" to "match all the" in the comment for the Filter function to make it more clear that the function filters events that match all the provided queries.
  • Loading branch information
bounoable committed Aug 22, 2023
1 parent 7826f58 commit 9005f1a
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 24 deletions.
67 changes: 44 additions & 23 deletions event/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,48 @@ import (
"github.com/modernice/goes/internal/concurrent"
)

// ErrRunning is returned when trying to run a *Handler that is already running.
// ErrRunning is the error returned when trying to run an event handler that is
// already running.
var ErrRunning = errors.New("event handler is already running")

// A Handler asynchronously handles published events.
//
// var bus event.Bus
// h := handler.New(bus)
// event.HandleWith(h, func(evt event.Of[FooData]) {...}, "foo")
// event.HandleWith(h, func(evt event.Of[BarData]) {...}, "bar")
//
// errs, err := h.Run(context.TODO())
// Handler is responsible for managing the registration and execution of event
// handlers in response to incoming events. It maintains a map of registered
// event handlers and listens to an event bus for events that match registered
// handler names. When such an event occurs, the corresponding handler function
// is invoked. Handler also provides support for querying stored events from an
// event store, and it can run concurrently, with its execution state being
// controlled through a provided context.
type Handler struct {
bus event.Bus
store event.Store // optional
store event.Store

mux sync.RWMutex
handlers map[string]func(event.Event)
eventNames map[string]struct{}
ctx context.Context
}

// Option is a handler option.
// Option is a function that modifies the configuration of a [Handler]. It
// provides a way to set optional parameters when creating a new [Handler]. This
// approach is used to avoid constructor cluttering when there are many
// configurations possible for a [Handler].
type Option func(*Handler)

// WithStore returns an Option that makes a Handler query the registered events
// from the provided store on startup and handle them as if they were published
// over the underlying event bus. This allows to handle "past" events on startup.
// WithStore is a function that returns an Option which, when applied to a
// Handler, sets the event.Store of that Handler. This is used to configure
// where the Handler stores events.
func WithStore(store event.Store) Option {
return func(h *Handler) {
h.store = store
}
}

// New returns an event handler for published events.
// New creates a new Handler with the provided event bus and applies the given
// options. The bus parameter is used to subscribe to events and distribute them
// to registered event handlers. Options can be used to configure the Handler,
// for example to specify an event store where past events are queried from on
// startup. The returned Handler is not running and must be started with the Run
// method.
func New(bus event.Bus, opts ...Option) *Handler {
h := &Handler{
bus: bus,
Expand All @@ -58,40 +66,53 @@ func New(bus event.Bus, opts ...Option) *Handler {
return h
}

// RegisterEventHandler registers the handler for the given event.
// Events must be registered before h.Run() is called. Events that are
// registered after h.Run() has been called, won't be handled.
// RegisterEventHandler registers an event handler function for the given event
// name. The event handler function, which takes an [event.Event] as a
// parameter, will be called whenever an event with the registered name occurs.
// The function is thread-safe and can be called concurrently.
func (h *Handler) RegisterEventHandler(name string, fn func(event.Event)) {
h.mux.Lock()
defer h.mux.Unlock()
h.handlers[name] = fn
h.eventNames[name] = struct{}{}
}

// EventHandler returns the event handler for the given event name.
// EventHandler retrieves the event handling function associated with the
// provided name. The function returned is responsible for processing an event
// of that name. If no handler is found for the given name, a nil function and
// false are returned.
func (h *Handler) EventHandler(name string) (func(event.Event), bool) {
h.mux.RLock()
defer h.mux.RUnlock()
fn, ok := h.handlers[name]
return fn, ok
}

// Context returns the context that was passed to h.Run(). If h.Run() has not
// been called yet, nil is returned.
// Context returns the context of the Handler. This context is set when the Run
// method is called and is used to control the lifecycle of the Handler. It's
// protected by a read-write lock, ensuring safe concurrent access.
func (h *Handler) Context() context.Context {
h.mux.RLock()
defer h.mux.RUnlock()
return h.ctx
}

// Running returns whether the handler is currently running.
// Running checks if the Handler is currently running. It returns true if the
// Handler has been started and not yet stopped, otherwise it returns false. The
// context of the Handler is used to determine its state. If the context is not
// nil, it indicates that the Handler is running.
func (h *Handler) Running() bool {
h.mux.RLock()
defer h.mux.RUnlock()
return h.ctx != nil
}

// Run runs the handler until ctx is canceled.
// Run starts the event handler with the provided context. It subscribes to all
// registered events on the event bus and begins handling incoming events. If
// the event handler has an event store, it also handles all stored events. If
// the handler is already running, it returns an error. The function returns a
// channel for errors that may occur during event handling and an error if there
// was an issue starting the handler.
func (h *Handler) Run(ctx context.Context) (<-chan error, error) {
h.mux.Lock()
defer h.mux.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion event/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

// Filter filters the events from the given input channel based on the provided
// queries and returns a new channel with only the events that pass all the
// queries and returns a new channel with only the events that match all the
// queries. If no queries are provided, the input channel is returned unchanged.
func Filter[D any](events <-chan Of[D], queries ...Query) <-chan Of[D] {
if len(queries) == 0 {
Expand Down

0 comments on commit 9005f1a

Please sign in to comment.