Skip to content

Commit

Permalink
Move wg to lock (#81)
Browse files Browse the repository at this point in the history
* Move wg to lock

Co-authored-by: o.omahony <[email protected]>
  • Loading branch information
oliveromahony and o.omahony authored Oct 20, 2022
1 parent e31e506 commit c1bd278
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 30 deletions.
29 changes: 14 additions & 15 deletions src/core/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type MessagePipe struct {
plugins []Plugin
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.Mutex
bus messagebus.MessageBus
}

Expand All @@ -36,15 +36,15 @@ func NewMessagePipe(ctx context.Context) *MessagePipe {
plugins: make([]Plugin, 0, MaxPlugins),
ctx: pipeContext,
cancel: pipeCancel,
wg: sync.WaitGroup{},
mu: sync.Mutex{},
}
}

func (p *MessagePipe) Register(size int, plugins ...Plugin) error {
p.mu.Lock()

p.plugins = append(p.plugins, plugins...)
p.bus = messagebus.New(size)
p.wg.Add(1)
defer p.wg.Done()
p.bus = messagebus.New(size)

for _, plugin := range p.plugins {
for _, subscription := range plugin.Subscriptions() {
Expand All @@ -55,6 +55,7 @@ func (p *MessagePipe) Register(size int, plugins ...Plugin) error {
}
}

p.mu.Unlock()
return nil
}

Expand All @@ -74,12 +75,17 @@ func (p *MessagePipe) Run() {
for {
select {
case <-p.ctx.Done():
p.shutdown()

for _, r := range p.plugins {
r.Close()
}
close(p.messageChannel)

return
case m := <-p.messageChannel:
p.wg.Add(1)
defer p.wg.Done()
p.mu.Lock()
p.bus.Publish(m.Topic(), m)
p.mu.Unlock()
}
}
}
Expand All @@ -101,10 +107,3 @@ func (p *MessagePipe) initPlugins() {
r.Init(p)
}
}

func (p *MessagePipe) shutdown() {
for _, r := range p.plugins {
r.Close()
}
close(p.messageChannel)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c1bd278

Please sign in to comment.