Skip to content

Commit

Permalink
refactor(eventbustest/core.go): increase event timeout from 50ms to 5…
Browse files Browse the repository at this point in the history
…00ms for better test reliability

refactor(eventbustest/core.go): simplify context handling in CancelSubscription test for better readability
feat(eventbustest/expect.go): add t.Helper() to Apply method to improve test output
refactor(eventbustest/expect.go): enhance Closed method with more detailed error messages and better timeout handling
refactor(eventbus/bus.go): move New function and add Close method for better structuring and resource management
refactor(eventbus/bus.go): improve unsubscribe handling in work method for better resource cleanup
  • Loading branch information
bounoable committed Sep 21, 2023
1 parent ee66a9a commit 0d8a1f3
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 26 deletions.
12 changes: 5 additions & 7 deletions backend/testing/eventbustest/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func CancelSubscription(t *testing.T, newBus EventBusFactory, opts ...Option) {

// When publishing a "foo" event, the event should be received
ex := Expect(ctx)
ex.Event(sub, 50*time.Millisecond, "foo")
ex.Event(sub, 500*time.Millisecond, "foo")

if err := bus.Publish(ctx, event.New("foo", test.FooEventData{}).Any()); err != nil {
t.Fatalf("publish event: %v [event=%v]", err, "foo")
Expand All @@ -180,15 +180,13 @@ func CancelSubscription(t *testing.T, newBus EventBusFactory, opts ...Option) {

// When the subscription ctx is canceled
cancel()
<-time.After(10 * time.Millisecond)

// And another "foo" event is published, the event should not be received
ctx, cancel = context.WithCancel(context.Background())
defer cancel()

ex = Expect(ctx)
ex.Closed(sub, 200*time.Millisecond)
ex = Expect(context.Background())
ex.Closed(sub, 500*time.Millisecond)

if err := bus.Publish(ctx, event.New("foo", test.FooEventData{}).Any()); err != nil {
if err := bus.Publish(context.Background(), event.New("foo", test.FooEventData{}).Any()); err != nil {
t.Fatalf("publish event: %v [event=%v]", err, "foo")
}

Expand Down
59 changes: 55 additions & 4 deletions backend/testing/eventbustest/expect.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (ex *Expectations) Result() error {
// Apply applies the result to the provided test. If the result is an error,
// t.Fatal(err) is called.
func (ex *Expectations) Apply(t *testing.T) {
t.Helper()
if err := ex.Result(); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -198,20 +199,43 @@ func (ex *Expectations) Events(sub Subscription, timeout time.Duration, names ..
// within the given duration.
func (ex *Expectations) Closed(sub Subscription, timeout time.Duration) {
count := ex.init()

go func() {
defer ex.wg.Done()

start := time.Now()

// Create a timer for timeout
timer := time.NewTimer(timeout)
defer timer.Stop()

// Check events channel
select {
case <-ex.ctx.Done():
return
case <-time.After(timeout):
ex.err("closed", count, fmt.Errorf("channels not closed [duration=%v]", time.Since(start)))
case <-timer.C:
ex.err("closed", count, fmt.Errorf("channels not closed [duration=%v]", timeout))
return
case _, ok := <-sub.events:
case evt, ok := <-sub.events:
if ok {
ex.err("closed", count, fmt.Errorf("event channel not closed [duration=%v]", time.Since(start)))
ex.err("closed", count, fmt.Errorf("event channel not closed [duration=%v]\n%s", time.Since(start), evt))
return
}
}

// Reset the timer for the next check
if !timer.Stop() {
<-timer.C
}
timer.Reset(timeout)

// Check errs channel
select {
case <-ex.ctx.Done():
return
case <-timer.C:
ex.err("closed", count, fmt.Errorf("channels not closed [duration=%v]", timeout))
return
case _, ok := <-sub.errs:
if ok {
ex.err("closed", count, fmt.Errorf("error channel not closed [duration=%v]", time.Since(start)))
Expand All @@ -221,6 +245,33 @@ func (ex *Expectations) Closed(sub Subscription, timeout time.Duration) {
}()
}

// // Closed expects the event and error channels of a subscription to be closed
// // within the given duration.
// func (ex *Expectations) Closed(sub Subscription, timeout time.Duration) {
// count := ex.init()
// go func() {
// defer ex.wg.Done()
// start := time.Now()
// select {
// case <-ex.ctx.Done():
// return
// case <-time.After(timeout):
// ex.err("closed", count, fmt.Errorf("channels not closed [duration=%v]", time.Since(start)))
// return
// case _, ok := <-sub.events:
// if ok {
// ex.err("closed", count, fmt.Errorf("event channel not closed [duration=%v]", time.Since(start)))
// return
// }
// case _, ok := <-sub.errs:
// if ok {
// ex.err("closed", count, fmt.Errorf("error channel not closed [duration=%v]", time.Since(start)))
// return
// }
// }
// }()
// }

func (ex *Expectations) init() int64 {
ex.wg.Add(1)
ex.once.Do(func() { go ex.work() })
Expand Down
41 changes: 26 additions & 15 deletions event/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,6 @@ import (
"github.com/modernice/goes/event"
)

// New creates and returns a new event.Bus backed by a channel-based
// implementation. The returned event.Bus allows subscribing to events,
// publishing events, and managing event subscriptions.
func New() event.Bus {
bus := &chanbus{
events: make(map[string]*eventSubscription),
queue: make(chan event.Event),
}
go bus.work()
return bus
}

type chanbus struct {
sync.RWMutex

Expand Down Expand Up @@ -50,6 +38,27 @@ type subscribeJob struct {
done chan struct{}
}

// New creates and returns a new event.Bus backed by a channel-based
// implementation. The returned event.Bus allows subscribing to events,
// publishing events, and managing event subscriptions.
func New() event.Bus {
bus := &chanbus{
events: make(map[string]*eventSubscription),
queue: make(chan event.Event),
done: make(chan struct{}),
}
go bus.work()
return bus
}

func (bus *chanbus) Close() {
select {
case <-bus.done:
default:
close(bus.done)
}
}

// Subscribe creates a subscription for the specified events and returns
// channels for receiving the events and errors. The subscription is
// automatically unsubscribed when the provided context is canceled.
Expand Down Expand Up @@ -168,10 +177,12 @@ func (sub *eventSubscription) subscribe(ctx context.Context) (recipient, error)
go func() {
<-ctx.Done()
close(rcpt.unsubbed)
sub.unsubscribeQueue <- subscribeJob{
job := subscribeJob{
rcpt: rcpt,
done: make(chan struct{}),
}
sub.unsubscribeQueue <- job
<-job.done
}()
}()

Expand All @@ -190,10 +201,10 @@ func (sub *eventSubscription) work() {
sub.recipients = append(sub.recipients, job.rcpt)
close(job.done)
case job := <-sub.unsubscribeQueue:
close(job.rcpt.events)
close(job.rcpt.errs)
for i, rcpt := range sub.recipients {
if rcpt == job.rcpt {
close(rcpt.errs)
close(rcpt.events)
sub.recipients = append(sub.recipients[:i], sub.recipients[i+1:]...)
break
}
Expand Down

0 comments on commit 0d8a1f3

Please sign in to comment.