From c9b7e15208db1f6723166d26ea27699daffc4ea6 Mon Sep 17 00:00:00 2001 From: Vincent Boutour Date: Wed, 14 Sep 2022 22:49:03 +0200 Subject: [PATCH] fix(event): Waiting for event bus to terminate for closing Signed-off-by: Vincent Boutour --- cmd/fibr/fibr.go | 2 +- pkg/provider/event.go | 38 +++++++++++++++++++++++--------------- pkg/share/share.go | 3 --- pkg/webhook/webhook.go | 2 -- 4 files changed, 24 insertions(+), 21 deletions(-) diff --git a/cmd/fibr/fibr.go b/cmd/fibr/fibr.go index c328a116..a98e5b7c 100644 --- a/cmd/fibr/fibr.go +++ b/cmd/fibr/fibr.go @@ -187,5 +187,5 @@ func main() { go appServer.Start("http", healthApp.End(), httputils.Handler(handler, healthApp, recoverer.Middleware, prometheusApp.Middleware, tracerApp.Middleware, owasp.New(owaspConfig).Middleware)) healthApp.WaitForTermination(appServer.Done()) - server.GracefulWait(appServer.Done(), promServer.Done(), amqpExifApp.Done(), amqpShareApp.Done(), amqpWebhookApp.Done()) + server.GracefulWait(appServer.Done(), promServer.Done(), amqpExifApp.Done(), amqpShareApp.Done(), amqpWebhookApp.Done(), eventBus.Done()) } diff --git a/pkg/provider/event.go b/pkg/provider/event.go index f479082e..f8da3e37 100644 --- a/pkg/provider/event.go +++ b/pkg/provider/event.go @@ -268,6 +268,7 @@ type EventBus struct { tracer trace.Tracer counter *prometheus.CounterVec bus chan Event + closed chan struct{} done chan struct{} } @@ -287,6 +288,7 @@ func NewEventBus(size uint64, prometheusRegisterer prometheus.Registerer, tracer } return EventBus{ + closed: make(chan struct{}), done: make(chan struct{}), bus: make(chan Event, size), counter: counter, @@ -302,11 +304,15 @@ func (e EventBus) increaseMetric(event Event, state string) { e.counter.WithLabelValues(event.Type.String(), state).Inc() } +func (e EventBus) Done() <-chan struct{} { + return e.done +} + func (e EventBus) Push(event Event) error { select { - case <-e.done: + case <-e.closed: e.increaseMetric(event, "refused") - return errors.New("done signal is received") + return errors.New("bus is closed") case e.bus <- event: e.increaseMetric(event, "push") return nil @@ -314,27 +320,29 @@ func (e EventBus) Push(event Event) error { } func (e EventBus) Start(done <-chan struct{}, storageApp absto.Storage, renamers []Renamer, consumers ...EventConsumer) { - defer close(e.bus) defer close(e.done) go func() { - for event := range e.bus { - ctx, end := tracer.StartSpan(context.Background(), e.tracer, "event", trace.WithAttributes(attribute.String("type", event.Type.String()))) + defer close(e.bus) + defer close(e.closed) - if event.Type == RenameEvent && event.Item.IsDir { - RenameDirectory(ctx, storageApp, renamers, event.Item, *event.New) - } + <-done + }() - for _, consumer := range consumers { - consumer(ctx, event) - } + for event := range e.bus { + ctx, end := tracer.StartSpan(context.Background(), e.tracer, "event", trace.WithAttributes(attribute.String("type", event.Type.String()))) - end() - e.increaseMetric(event, "done") + if event.Type == RenameEvent && event.Item.IsDir { + RenameDirectory(ctx, storageApp, renamers, event.Item, *event.New) + } + + for _, consumer := range consumers { + consumer(ctx, event) } - }() - <-done + end() + e.increaseMetric(event, "done") + } } func RenameDirectory(ctx context.Context, storageApp absto.Storage, renamers []Renamer, old, new absto.Item) { diff --git a/pkg/share/share.go b/pkg/share/share.go index dd8e9532..cc7e571c 100644 --- a/pkg/share/share.go +++ b/pkg/share/share.go @@ -79,7 +79,6 @@ func New(config Config, storageApp absto.Storage, amqpClient *amqp.Client) (*App }, nil } -// Exclusive does action on shares with exclusive lock func (a *App) Exclusive(ctx context.Context, name string, duration time.Duration, action func(ctx context.Context) error) (bool, error) { fn := func() error { a.Lock() @@ -111,7 +110,6 @@ exclusive: return true, nil } -// Get returns a share based on path func (a *App) Get(requestPath string) provider.Share { cleanPath := strings.TrimPrefix(requestPath, "/") @@ -127,7 +125,6 @@ func (a *App) Get(requestPath string) provider.Share { return provider.Share{} } -// Start worker func (a *App) Start(done <-chan struct{}) { if err := a.loadShares(context.Background()); err != nil { logger.Error("refresh shares: %s", err) diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go index ec4bcfcb..0ddd16f3 100644 --- a/pkg/webhook/webhook.go +++ b/pkg/webhook/webhook.go @@ -93,7 +93,6 @@ func New(config Config, storageApp absto.Storage, prometheusRegisterer prometheu }, nil } -// Exclusive does action on webhook with exclusive lock func (a *App) Exclusive(ctx context.Context, name string, duration time.Duration, action func(ctx context.Context) error) error { fn := func() error { a.Lock() @@ -125,7 +124,6 @@ exclusive: return nil } -// Start worker func (a *App) Start(_ <-chan struct{}) { a.Lock() defer a.Unlock()