diff --git a/cmd/adder/main.go b/cmd/adder/main.go index b77f969..9591e97 100644 --- a/cmd/adder/main.go +++ b/cmd/adder/main.go @@ -19,7 +19,9 @@ import ( "log/slog" "net/http" "os" + "os/signal" "runtime" + "syscall" "time" "github.com/blinklabs-io/adder/api" @@ -201,9 +203,30 @@ func main() { logger.Error(fmt.Sprintf("failed to start pipeline: %s", err)) os.Exit(1) } - err, ok := <-pipe.ErrorChan() - if ok { - logger.Error(fmt.Sprintf("pipeline failed: %s", err)) + + // Setup graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // Handle errors in background + // DON'T exit on errors + go func() { + for err := range pipe.ErrorChan() { + // Log error but keep running + logger.Error(fmt.Sprintf("pipeline error: %s", err)) + } + logger.Info("Error channel closed") + }() + + logger.Info("Adder started, waiting for shutdown signal...") + <-sigChan + logger.Info("Shutdown signal received, stopping pipeline...") + + // Graceful shutdown using Stop() method + if err := pipe.Stop(); err != nil { + logger.Error(fmt.Sprintf("failed to stop pipeline: %s", err)) os.Exit(1) } + + logger.Info("Adder stopped gracefully") } diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index a4605d2..ea334fd 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -1,4 +1,4 @@ -// Copyright 2023 Blink Labs Software +// Copyright 2025 Blink Labs Software // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ package pipeline import ( + "errors" "fmt" "sync" @@ -31,6 +32,7 @@ type Pipeline struct { errorChan chan error doneChan chan bool wg sync.WaitGroup + stopOnce sync.Once } func New() *Pipeline { @@ -55,44 +57,61 @@ func (p *Pipeline) AddOutput(output plugin.Plugin) { p.outputs = append(p.outputs, output) } -func (p *Pipeline) ErrorChan() chan error { +// ErrorChan is read-only +func (p *Pipeline) ErrorChan() <-chan error { return p.errorChan } // Start initiates the configured plugins and starts the necessary background processes to run the pipeline func (p *Pipeline) Start() error { + // Check if doneChan is already closed this happens if pipeline was stopped + // A stopped pipeline cannot be restarted + select { + case <-p.doneChan: + return errors.New("cannot start a stopped pipeline") + default: + // continue + } + // Start inputs for _, input := range p.inputs { if err := input.Start(); err != nil { return fmt.Errorf("failed to start input: %w", err) } // Start background process to send input events to combined filter channel + p.wg.Add(1) go p.chanCopyLoop(input.OutputChan(), p.filterChan) // Start background error listener + p.wg.Add(1) go p.errorChanWait(input.ErrorChan()) } // Start filters for idx, filter := range p.filters { if err := filter.Start(); err != nil { - return fmt.Errorf("failed to start input: %w", err) + return fmt.Errorf("failed to start filter: %w", err) } if idx == 0 { // Start background process to send events from combined filter channel to first filter plugin + p.wg.Add(1) go p.chanCopyLoop(p.filterChan, filter.InputChan()) } else { // Start background process to send events from previous filter plugin to current filter plugin + p.wg.Add(1) go p.chanCopyLoop(p.filters[idx-1].OutputChan(), filter.InputChan()) } if idx == len(p.filters)-1 { // Start background process to send events from last filter to combined output channel + p.wg.Add(1) go p.chanCopyLoop(filter.OutputChan(), p.outputChan) } // Start background error listener + p.wg.Add(1) go p.errorChanWait(filter.ErrorChan()) } if len(p.filters) == 0 { // Start background process to send events from combined filter channel to combined output channel if // there are no filter plugins + p.wg.Add(1) go p.chanCopyLoop(p.filterChan, p.outputChan) } // Start outputs @@ -101,32 +120,47 @@ func (p *Pipeline) Start() error { return fmt.Errorf("failed to start output: %w", err) } // Start background error listener + p.wg.Add(1) go p.errorChanWait(output.ErrorChan()) } + p.wg.Add(1) go p.outputChanLoop() return nil } // Stop shuts down the pipeline and all plugins +// Stop is idempotent and safe to call multiple times +// A stopped pipeline cannot be restarted func (p *Pipeline) Stop() error { - close(p.doneChan) - p.wg.Wait() - close(p.errorChan) - close(p.filterChan) - close(p.outputChan) - // Stop inputs - for _, input := range p.inputs { - if err := input.Stop(); err != nil { - return fmt.Errorf("failed to stop input: %w", err) + var stopErrors []error + + p.stopOnce.Do(func() { + close(p.doneChan) + p.wg.Wait() + + // Stop plugins and collect errors + for _, input := range p.inputs { + if err := input.Stop(); err != nil { + stopErrors = append(stopErrors, fmt.Errorf("failed to stop input: %w", err)) + } } - } - // Stop outputs - for _, output := range p.outputs { - if err := output.Stop(); err != nil { - return fmt.Errorf("failed to stop output: %w", err) + for _, filter := range p.filters { + if err := filter.Stop(); err != nil { + stopErrors = append(stopErrors, fmt.Errorf("failed to stop filter: %w", err)) + } } - } - return nil + for _, output := range p.outputs { + if err := output.Stop(); err != nil { + stopErrors = append(stopErrors, fmt.Errorf("failed to stop output: %w", err)) + } + } + + close(p.errorChan) + close(p.filterChan) + close(p.outputChan) + }) + + return errors.Join(stopErrors...) } // chanCopyLoop is a generic function for reading an event from one channel and writing it to another in a loop @@ -134,16 +168,20 @@ func (p *Pipeline) chanCopyLoop( input <-chan event.Event, output chan<- event.Event, ) { - p.wg.Add(1) + defer p.wg.Done() for { select { case <-p.doneChan: - p.wg.Done() return case evt, ok := <-input: - if ok { - // Copy input event to output chan - output <- evt + if !ok { + return + } + select { + // Pass input event to output chan + case output <- evt: + case <-p.doneChan: + return } } } @@ -151,28 +189,45 @@ func (p *Pipeline) chanCopyLoop( // outputChanLoop reads events from the output channel and writes them to each output plugin's input channel func (p *Pipeline) outputChanLoop() { - p.wg.Add(1) + defer p.wg.Done() for { select { case <-p.doneChan: - p.wg.Done() return case evt, ok := <-p.outputChan: - if ok { - // Send event to all output plugins - for _, output := range p.outputs { - output.InputChan() <- evt + if !ok { + return + } + // Send event to all output plugins + for _, output := range p.outputs { + select { + case output.InputChan() <- evt: + case <-p.doneChan: + return } } } } } -// errorChanWait reads from an error channel. If an error is received, it's copied to the plugin error channel and the plugin stopped +// errorChanWait reads from an error channel. If an error is received, it's copied to the plugin error channel func (p *Pipeline) errorChanWait(errorChan chan error) { - err, ok := <-errorChan - if ok { - p.errorChan <- err - _ = p.Stop() + defer p.wg.Done() + for { + select { + case <-p.doneChan: + return + case err, ok := <-errorChan: + if !ok { + // Channel closed + return + } + // Forward plugin error to pipeline error channel + select { + case p.errorChan <- err: + case <-p.doneChan: + return + } + } } }