diff --git a/agreement/service.go b/agreement/service.go index 5247ce34cc..a3339bbc91 100644 --- a/agreement/service.go +++ b/agreement/service.go @@ -20,6 +20,7 @@ package agreement import ( "context" "io" + "sync" "time" "github.com/algorand/go-algorand/config" @@ -40,7 +41,7 @@ type Service struct { // for exiting quit chan struct{} - done chan struct{} + wg sync.WaitGroup quitFn context.CancelFunc // TODO instead of storing this, pass a context into Start() // external events @@ -139,7 +140,6 @@ func (s *Service) Start() { s.quitFn = quitFn s.quit = make(chan struct{}) - s.done = make(chan struct{}) s.voteVerifier = MakeAsyncVoteVerifier(s.BacklogPool) s.demux = makeDemux(demuxParams{ @@ -165,6 +165,7 @@ func (s *Service) Start() { input := make(chan externalEvent) output := make(chan []action) ready := make(chan externalDemuxSignals) + s.wg.Add(2) go s.demuxLoop(ctx, input, output, ready) go s.mainLoop(input, output, ready) } @@ -178,7 +179,7 @@ func (s *Service) Shutdown() { close(s.quit) s.quitFn() - <-s.done + s.wg.Wait() s.persistenceLoop.Quit() } @@ -189,6 +190,7 @@ func (s *Service) DumpDemuxQueues(w io.Writer) { // demuxLoop repeatedly executes pending actions and then requests the next event from the Service.demux. func (s *Service) demuxLoop(ctx context.Context, input chan<- externalEvent, output <-chan []action, ready <-chan externalDemuxSignals) { + defer s.wg.Done() for a := range output { s.do(ctx, a) extSignals := <-ready @@ -202,7 +204,6 @@ func (s *Service) demuxLoop(ctx context.Context, input chan<- externalEvent, out s.demux.quit() s.loopback.Quit() s.voteVerifier.Quit() - close(s.done) } // mainLoop drives the state machine. @@ -213,7 +214,8 @@ func (s *Service) demuxLoop(ctx context.Context, input chan<- externalEvent, out // 3. Drive the state machine with this input to obtain a slice of pending actions. // 4. If necessary, persist state to disk. func (s *Service) mainLoop(input <-chan externalEvent, output chan<- []action, ready chan<- externalDemuxSignals) { - // setup + defer s.wg.Done() + var clock timers.Clock[TimeoutType] var router rootRouter var status player diff --git a/agreement/service_test.go b/agreement/service_test.go index 3b174c266e..b46aefb659 100644 --- a/agreement/service_test.go +++ b/agreement/service_test.go @@ -2543,6 +2543,7 @@ func TestAgreementServiceStartDeadline(t *testing.T) { close(inputCh) output := make(chan []action, 10) ready := make(chan externalDemuxSignals, 1) + s.wg.Add(1) s.mainLoop(inputCh, output, ready) // check the ready channel: