Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions cmd/adder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"log/slog"
"net/http"
"os"
"os/signal"
"runtime"
"syscall"
"time"

"github.com/blinklabs-io/adder/api"
Expand Down Expand Up @@ -201,9 +203,30 @@ func main() {
logger.Error(fmt.Sprintf("failed to start pipeline: %s", err))
os.Exit(1)
}
err, ok := <-pipe.ErrorChan()
if ok {
logger.Error(fmt.Sprintf("pipeline failed: %s", err))

// Setup graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

// Handle errors in background
// DON'T exit on errors
go func() {
for err := range pipe.ErrorChan() {
// Log error but keep running
logger.Error(fmt.Sprintf("pipeline error: %s", err))
}
logger.Info("Error channel closed")
}()

logger.Info("Adder started, waiting for shutdown signal...")
<-sigChan
logger.Info("Shutdown signal received, stopping pipeline...")

// Graceful shutdown using Stop() method
if err := pipe.Stop(); err != nil {
logger.Error(fmt.Sprintf("failed to stop pipeline: %s", err))
os.Exit(1)
}

logger.Info("Adder stopped gracefully")
}
125 changes: 90 additions & 35 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Blink Labs Software
// Copyright 2025 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
package pipeline

import (
"errors"
"fmt"
"sync"

Expand All @@ -31,6 +32,7 @@ type Pipeline struct {
errorChan chan error
doneChan chan bool
wg sync.WaitGroup
stopOnce sync.Once
}

func New() *Pipeline {
Expand All @@ -55,44 +57,61 @@ func (p *Pipeline) AddOutput(output plugin.Plugin) {
p.outputs = append(p.outputs, output)
}

func (p *Pipeline) ErrorChan() chan error {
// ErrorChan is read-only
func (p *Pipeline) ErrorChan() <-chan error {
return p.errorChan
}

// Start initiates the configured plugins and starts the necessary background processes to run the pipeline
func (p *Pipeline) Start() error {
// Check if doneChan is already closed this happens if pipeline was stopped
// A stopped pipeline cannot be restarted
select {
case <-p.doneChan:
return errors.New("cannot start a stopped pipeline")
default:
// continue
}

// Start inputs
for _, input := range p.inputs {
if err := input.Start(); err != nil {
return fmt.Errorf("failed to start input: %w", err)
}
// Start background process to send input events to combined filter channel
p.wg.Add(1)
go p.chanCopyLoop(input.OutputChan(), p.filterChan)
// Start background error listener
p.wg.Add(1)
go p.errorChanWait(input.ErrorChan())
}
// Start filters
for idx, filter := range p.filters {
if err := filter.Start(); err != nil {
return fmt.Errorf("failed to start input: %w", err)
return fmt.Errorf("failed to start filter: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😅

}
if idx == 0 {
// Start background process to send events from combined filter channel to first filter plugin
p.wg.Add(1)
go p.chanCopyLoop(p.filterChan, filter.InputChan())
} else {
// Start background process to send events from previous filter plugin to current filter plugin
p.wg.Add(1)
go p.chanCopyLoop(p.filters[idx-1].OutputChan(), filter.InputChan())
}
if idx == len(p.filters)-1 {
// Start background process to send events from last filter to combined output channel
p.wg.Add(1)
go p.chanCopyLoop(filter.OutputChan(), p.outputChan)
}
// Start background error listener
p.wg.Add(1)
go p.errorChanWait(filter.ErrorChan())
}
if len(p.filters) == 0 {
// Start background process to send events from combined filter channel to combined output channel if
// there are no filter plugins
p.wg.Add(1)
go p.chanCopyLoop(p.filterChan, p.outputChan)
}
// Start outputs
Expand All @@ -101,78 +120,114 @@ func (p *Pipeline) Start() error {
return fmt.Errorf("failed to start output: %w", err)
}
// Start background error listener
p.wg.Add(1)
go p.errorChanWait(output.ErrorChan())
}
p.wg.Add(1)
go p.outputChanLoop()
return nil
}

// Stop shuts down the pipeline and all plugins
// Stop is idempotent and safe to call multiple times
// A stopped pipeline cannot be restarted
func (p *Pipeline) Stop() error {
close(p.doneChan)
p.wg.Wait()
close(p.errorChan)
close(p.filterChan)
close(p.outputChan)
// Stop inputs
for _, input := range p.inputs {
if err := input.Stop(); err != nil {
return fmt.Errorf("failed to stop input: %w", err)
var stopErrors []error

p.stopOnce.Do(func() {
close(p.doneChan)
p.wg.Wait()

// Stop plugins and collect errors
for _, input := range p.inputs {
if err := input.Stop(); err != nil {
stopErrors = append(stopErrors, fmt.Errorf("failed to stop input: %w", err))
}
}
}
// Stop outputs
for _, output := range p.outputs {
if err := output.Stop(); err != nil {
return fmt.Errorf("failed to stop output: %w", err)
for _, filter := range p.filters {
if err := filter.Stop(); err != nil {
stopErrors = append(stopErrors, fmt.Errorf("failed to stop filter: %w", err))
}
}
}
return nil
for _, output := range p.outputs {
if err := output.Stop(); err != nil {
stopErrors = append(stopErrors, fmt.Errorf("failed to stop output: %w", err))
}
}

close(p.errorChan)
close(p.filterChan)
close(p.outputChan)
})

return errors.Join(stopErrors...)
}

// chanCopyLoop is a generic function for reading an event from one channel and writing it to another in a loop
func (p *Pipeline) chanCopyLoop(
input <-chan event.Event,
output chan<- event.Event,
) {
p.wg.Add(1)
defer p.wg.Done()
for {
select {
case <-p.doneChan:
p.wg.Done()
return
case evt, ok := <-input:
if ok {
// Copy input event to output chan
output <- evt
if !ok {
return
}
select {
// Pass input event to output chan
case output <- evt:
case <-p.doneChan:
return
}
}
}
}

// outputChanLoop reads events from the output channel and writes them to each output plugin's input channel
func (p *Pipeline) outputChanLoop() {
p.wg.Add(1)
defer p.wg.Done()
for {
select {
case <-p.doneChan:
p.wg.Done()
return
case evt, ok := <-p.outputChan:
if ok {
// Send event to all output plugins
for _, output := range p.outputs {
output.InputChan() <- evt
if !ok {
return
}
// Send event to all output plugins
for _, output := range p.outputs {
select {
case output.InputChan() <- evt:
case <-p.doneChan:
return
}
}
}
}
}

// errorChanWait reads from an error channel. If an error is received, it's copied to the plugin error channel and the plugin stopped
// errorChanWait reads from an error channel. If an error is received, it's copied to the plugin error channel
func (p *Pipeline) errorChanWait(errorChan chan error) {
err, ok := <-errorChan
if ok {
p.errorChan <- err
_ = p.Stop()
defer p.wg.Done()
for {
select {
case <-p.doneChan:
return
case err, ok := <-errorChan:
if !ok {
// Channel closed
return
}
// Forward plugin error to pipeline error channel
select {
case p.errorChan <- err:
case <-p.doneChan:
return
}
}
}
}
Loading