Skip to content

Commit

Permalink
fix(event): Waiting for event bus to terminate for closing
Browse files Browse the repository at this point in the history
Signed-off-by: Vincent Boutour <[email protected]>
  • Loading branch information
ViBiOh committed Sep 14, 2022
1 parent 7d64e53 commit c9b7e15
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 21 deletions.
2 changes: 1 addition & 1 deletion cmd/fibr/fibr.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,5 +187,5 @@ func main() {
go appServer.Start("http", healthApp.End(), httputils.Handler(handler, healthApp, recoverer.Middleware, prometheusApp.Middleware, tracerApp.Middleware, owasp.New(owaspConfig).Middleware))

healthApp.WaitForTermination(appServer.Done())
server.GracefulWait(appServer.Done(), promServer.Done(), amqpExifApp.Done(), amqpShareApp.Done(), amqpWebhookApp.Done())
server.GracefulWait(appServer.Done(), promServer.Done(), amqpExifApp.Done(), amqpShareApp.Done(), amqpWebhookApp.Done(), eventBus.Done())
}
38 changes: 23 additions & 15 deletions pkg/provider/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ type EventBus struct {
tracer trace.Tracer
counter *prometheus.CounterVec
bus chan Event
closed chan struct{}
done chan struct{}
}

Expand All @@ -287,6 +288,7 @@ func NewEventBus(size uint64, prometheusRegisterer prometheus.Registerer, tracer
}

return EventBus{
closed: make(chan struct{}),
done: make(chan struct{}),
bus: make(chan Event, size),
counter: counter,
Expand All @@ -302,39 +304,45 @@ func (e EventBus) increaseMetric(event Event, state string) {
e.counter.WithLabelValues(event.Type.String(), state).Inc()
}

func (e EventBus) Done() <-chan struct{} {
return e.done
}

func (e EventBus) Push(event Event) error {
select {
case <-e.done:
case <-e.closed:
e.increaseMetric(event, "refused")
return errors.New("done signal is received")
return errors.New("bus is closed")
case e.bus <- event:
e.increaseMetric(event, "push")
return nil
}
}

func (e EventBus) Start(done <-chan struct{}, storageApp absto.Storage, renamers []Renamer, consumers ...EventConsumer) {
defer close(e.bus)
defer close(e.done)

go func() {
for event := range e.bus {
ctx, end := tracer.StartSpan(context.Background(), e.tracer, "event", trace.WithAttributes(attribute.String("type", event.Type.String())))
defer close(e.bus)
defer close(e.closed)

if event.Type == RenameEvent && event.Item.IsDir {
RenameDirectory(ctx, storageApp, renamers, event.Item, *event.New)
}
<-done
}()

for _, consumer := range consumers {
consumer(ctx, event)
}
for event := range e.bus {
ctx, end := tracer.StartSpan(context.Background(), e.tracer, "event", trace.WithAttributes(attribute.String("type", event.Type.String())))

end()
e.increaseMetric(event, "done")
if event.Type == RenameEvent && event.Item.IsDir {
RenameDirectory(ctx, storageApp, renamers, event.Item, *event.New)
}

for _, consumer := range consumers {
consumer(ctx, event)
}
}()

<-done
end()
e.increaseMetric(event, "done")
}
}

func RenameDirectory(ctx context.Context, storageApp absto.Storage, renamers []Renamer, old, new absto.Item) {
Expand Down
3 changes: 0 additions & 3 deletions pkg/share/share.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func New(config Config, storageApp absto.Storage, amqpClient *amqp.Client) (*App
}, nil
}

// Exclusive does action on shares with exclusive lock
func (a *App) Exclusive(ctx context.Context, name string, duration time.Duration, action func(ctx context.Context) error) (bool, error) {
fn := func() error {
a.Lock()
Expand Down Expand Up @@ -111,7 +110,6 @@ exclusive:
return true, nil
}

// Get returns a share based on path
func (a *App) Get(requestPath string) provider.Share {
cleanPath := strings.TrimPrefix(requestPath, "/")

Expand All @@ -127,7 +125,6 @@ func (a *App) Get(requestPath string) provider.Share {
return provider.Share{}
}

// Start worker
func (a *App) Start(done <-chan struct{}) {
if err := a.loadShares(context.Background()); err != nil {
logger.Error("refresh shares: %s", err)
Expand Down
2 changes: 0 additions & 2 deletions pkg/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func New(config Config, storageApp absto.Storage, prometheusRegisterer prometheu
}, nil
}

// Exclusive does action on webhook with exclusive lock
func (a *App) Exclusive(ctx context.Context, name string, duration time.Duration, action func(ctx context.Context) error) error {
fn := func() error {
a.Lock()
Expand Down Expand Up @@ -125,7 +124,6 @@ exclusive:
return nil
}

// Start worker
func (a *App) Start(_ <-chan struct{}) {
a.Lock()
defer a.Unlock()
Expand Down

0 comments on commit c9b7e15

Please sign in to comment.