File tree Expand file tree Collapse file tree 1 file changed +7
-0
lines changed Expand file tree Collapse file tree 1 file changed +7
-0
lines changed Original file line number Diff line number Diff line change @@ -16,6 +16,7 @@ package pipeline
1616
1717import (
1818 "fmt"
19+ "sync"
1920
2021 "github.com/blinklabs-io/adder/event"
2122 "github.com/blinklabs-io/adder/plugin"
@@ -29,6 +30,7 @@ type Pipeline struct {
2930 outputChan chan event.Event
3031 errorChan chan error
3132 doneChan chan bool
33+ wg sync.WaitGroup
3234}
3335
3436func New () * Pipeline {
@@ -108,6 +110,7 @@ func (p *Pipeline) Start() error {
108110// Stop shuts down the pipeline and all plugins
109111func (p * Pipeline ) Stop () error {
110112 close (p .doneChan )
113+ p .wg .Wait ()
111114 close (p .errorChan )
112115 close (p .filterChan )
113116 close (p .outputChan )
@@ -131,9 +134,11 @@ func (p *Pipeline) chanCopyLoop(
131134 input <- chan event.Event ,
132135 output chan <- event.Event ,
133136) {
137+ p .wg .Add (1 )
134138 for {
135139 select {
136140 case <- p .doneChan :
141+ p .wg .Done ()
137142 return
138143 case evt , ok := <- input :
139144 if ok {
@@ -146,9 +151,11 @@ func (p *Pipeline) chanCopyLoop(
146151
147152// outputChanLoop reads events from the output channel and writes them to each output plugin's input channel
148153func (p * Pipeline ) outputChanLoop () {
154+ p .wg .Add (1 )
149155 for {
150156 select {
151157 case <- p .doneChan :
158+ p .wg .Done ()
152159 return
153160 case evt , ok := <- p .outputChan :
154161 if ok {
You can’t perform that action at this time.
0 commit comments