From 06e726b07d770c6f1897a4da4af5da08287b8767 Mon Sep 17 00:00:00 2001 From: Rick Brouwer Date: Fri, 29 May 2026 14:14:34 +0200 Subject: [PATCH 1/6] fix(splunk): add MTS stream handling with context timeout Signed-off-by: Rick Brouwer --- CHANGELOG.md | 1 + pkg/scalers/splunk_observability_scaler.go | 87 ++++++++++++++----- .../splunk_observability_scaler_test.go | 60 +++++++++++++ 3 files changed, 127 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8711321daa2..14d7ef6a9ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -141,6 +141,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **Redis Scaler**: Use literal command names in Lua script to fix compatibility with Alibaba Cloud Redis Cluster ([#7758](https://github.com/kedacore/keda/issues/7758)) - **Solace Scaler**: Fix URL escaping for Message VPN and Queue names ([#7481](https://github.com/kedacore/keda/pull/7481)) - **Solr Scaler**: Use net/url to safely encode query parameters ([#7467](https://github.com/kedacore/keda/pull/7467)) +- **Splunk Observability Scaler**: Add MTS stream handling with context timeout ([#7799](https://github.com/kedacore/keda/pull/7799)) ### Deprecations diff --git a/pkg/scalers/splunk_observability_scaler.go b/pkg/scalers/splunk_observability_scaler.go index 89cd34db61c..858f79b764c 100644 --- a/pkg/scalers/splunk_observability_scaler.go +++ b/pkg/scalers/splunk_observability_scaler.go @@ -15,6 +15,12 @@ import ( kedautil "github.com/kedacore/keda/v2/pkg/util" ) +// splunkO11yStreamMargin bounds the stream read beyond the configured Duration. +const splunkO11yStreamMargin = 10 * time.Second + +// splunkO11yDrainTimeout is a short best-effort budget for draining after Stop. +const splunkO11yDrainTimeout = 2 * time.Second + type splunkObservabilityMetadata struct { TriggerIndex int @@ -78,6 +84,30 @@ func NewSplunkObservabilityScaler(config *scalersconfig.ScalerConfig) (Scaler, e }, nil } +// stopAndDrain stops the computation and drains the data channel so the client's goroutines can exit. +func (s *splunkObservabilityScaler) stopAndDrain(comp *signalflow.Computation) { + // Fresh, short context: the caller's may be done and some backends never close on Stop. + stopCtx, cancel := context.WithTimeout(context.Background(), splunkO11yDrainTimeout) + defer cancel() + + if err := comp.Stop(stopCtx); err != nil { + s.logger.V(1).Info("Failed to stop SignalFlow computation", "error", err) + } + + dataCh := comp.Data() + for { + select { + case _, ok := <-dataCh: + if !ok { + return + } + case <-stopCtx.Done(): + s.logger.V(1).Info("Gave up draining SignalFlow data channel after stop") + return + } + } +} + func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64, error) { comp, err := s.apiClient.Execute(ctx, &signalflow.ExecuteRequest{ Program: s.metadata.Query, @@ -88,35 +118,50 @@ func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64 s.logger.V(1).Info("Started MTS stream.") - stopTimer := time.After(time.Duration(s.metadata.Duration) * time.Second) - go func() { - <-stopTimer - s.logger.V(1).Info("Stopping MTS stream after duration.") - if err := comp.Stop(ctx); err != nil { - s.logger.Error(err, "Failed to stop SignalFlow computation") - } - }() + // Hard deadline beyond the Duration window so a non-responsive backend cannot block forever. + streamDuration := time.Duration(s.metadata.Duration) * time.Second + streamCtx, cancel := context.WithTimeout(ctx, streamDuration+splunkO11yStreamMargin) + defer cancel() + + stopTimer := time.After(streamDuration) maxValue := math.Inf(-1) minValue := math.Inf(1) valueSum := 0.0 valueCount := 0 s.logger.V(1).Info("Now iterating over results.") - for msg := range comp.Data() { - if len(msg.Payloads) == 0 { - s.logger.V(1).Info("No data retrieved.") - continue - } - for _, pl := range msg.Payloads { - value, ok := pl.Value().(float64) + + dataCh := comp.Data() +loop: + for { + select { + case <-streamCtx.Done(): + s.logger.V(1).Info("Context done before stream completed; stopping computation.") + s.stopAndDrain(comp) + return -1, fmt.Errorf("splunk observability query did not complete in time: %w", streamCtx.Err()) + case <-stopTimer: + s.logger.V(1).Info("Stopping MTS stream after duration.") + s.stopAndDrain(comp) + break loop + case msg, ok := <-dataCh: if !ok { - return -1, fmt.Errorf("could not convert Splunk Observability metric value to float64") + break loop + } + if len(msg.Payloads) == 0 { + s.logger.V(1).Info("No data retrieved.") + continue + } + for _, pl := range msg.Payloads { + value, ok := pl.Value().(float64) + if !ok { + return -1, fmt.Errorf("could not convert Splunk Observability metric value to float64") + } + s.logger.V(1).Info(fmt.Sprintf("Encountering value %.4f\n", value)) + maxValue = math.Max(maxValue, value) + minValue = math.Min(minValue, value) + valueSum += value + valueCount++ } - s.logger.V(1).Info(fmt.Sprintf("Encountering value %.4f\n", value)) - maxValue = math.Max(maxValue, value) - minValue = math.Min(minValue, value) - valueSum += value - valueCount++ } } diff --git a/pkg/scalers/splunk_observability_scaler_test.go b/pkg/scalers/splunk_observability_scaler_test.go index 248138fe40a..2809a0bfbe4 100644 --- a/pkg/scalers/splunk_observability_scaler_test.go +++ b/pkg/scalers/splunk_observability_scaler_test.go @@ -3,6 +3,11 @@ package scalers import ( "context" "testing" + "time" + + "github.com/go-logr/logr" + "github.com/signalfx/signalflow-client-go/v2/signalflow" + "github.com/signalfx/signalfx-go/idtool" "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" ) @@ -94,3 +99,58 @@ func TestSplunkObservabilityGetMetricSpecForScaling(t *testing.T) { } } } + +// newFakeSplunkO11yScaler wires a scaler to a fake backend that streams indefinitely without closing. +func newFakeSplunkO11yScaler(t *testing.T, program string, duration int) (*splunkObservabilityScaler, func()) { + t.Helper() + + fake := signalflow.NewRunningFakeBackend() + client, err := fake.Client() + if err != nil { + fake.Stop() + t.Fatal("could not create fake backend client:", err) + } + + tsid := idtool.ID(1) + fake.AddProgramTSIDs(program, []idtool.ID{tsid}) + fake.SetTSIDFloatData(tsid, 42.0) + + scaler := &splunkObservabilityScaler{ + metadata: &splunkObservabilityMetadata{ + Query: program, + Duration: duration, + QueryAggregator: "max", + }, + apiClient: client, + logger: logr.Discard(), + } + + return scaler, fake.Stop +} + +// Regression guard: a stuck stream must not block getQueryResult past the parent context deadline. +func TestSplunkObservabilityGetQueryResultReturnsOnParentContextCancel(t *testing.T) { + const program = "data('demo.trans.latency').max().publish()" + // Large duration so the stopTimer never fires; the parent deadline must bound the call. + scaler, stop := newFakeSplunkO11yScaler(t, program, 3600) + defer stop() + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + done := make(chan struct{}) + start := time.Now() + go func() { + defer close(done) + _, _ = scaler.getQueryResult(ctx) + }() + + select { + case <-done: + if elapsed := time.Since(start); elapsed > 5*time.Second { + t.Fatalf("getQueryResult returned after %v, far longer than the context deadline", elapsed) + } + case <-time.After(10 * time.Second): + t.Fatal("getQueryResult did not return after parent context was cancelled; it is hanging") + } +} From 69985d96a102e10c6234e59b5282d72fdeec2642 Mon Sep 17 00:00:00 2001 From: Rick Brouwer Date: Fri, 29 May 2026 16:35:00 +0200 Subject: [PATCH 2/6] update after comment Signed-off-by: Rick Brouwer --- pkg/scalers/splunk_observability_scaler.go | 87 +++++++++++++--------- 1 file changed, 51 insertions(+), 36 deletions(-) diff --git a/pkg/scalers/splunk_observability_scaler.go b/pkg/scalers/splunk_observability_scaler.go index 858f79b764c..8f443e2e529 100644 --- a/pkg/scalers/splunk_observability_scaler.go +++ b/pkg/scalers/splunk_observability_scaler.go @@ -8,6 +8,7 @@ import ( "github.com/go-logr/logr" "github.com/signalfx/signalflow-client-go/v2/signalflow" + "github.com/signalfx/signalflow-client-go/v2/signalflow/messages" v2 "k8s.io/api/autoscaling/v2" "k8s.io/metrics/pkg/apis/external_metrics" @@ -84,28 +85,14 @@ func NewSplunkObservabilityScaler(config *scalersconfig.ScalerConfig) (Scaler, e }, nil } -// stopAndDrain stops the computation and drains the data channel so the client's goroutines can exit. -func (s *splunkObservabilityScaler) stopAndDrain(comp *signalflow.Computation) { - // Fresh, short context: the caller's may be done and some backends never close on Stop. +// stopComputation stops the computation, ignoring errors beyond debug logging. +func (s *splunkObservabilityScaler) stopComputation(comp *signalflow.Computation) { stopCtx, cancel := context.WithTimeout(context.Background(), splunkO11yDrainTimeout) defer cancel() if err := comp.Stop(stopCtx); err != nil { s.logger.V(1).Info("Failed to stop SignalFlow computation", "error", err) } - - dataCh := comp.Data() - for { - select { - case _, ok := <-dataCh: - if !ok { - return - } - case <-stopCtx.Done(): - s.logger.V(1).Info("Gave up draining SignalFlow data channel after stop") - return - } - } } func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64, error) { @@ -118,17 +105,38 @@ func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64 s.logger.V(1).Info("Started MTS stream.") - // Hard deadline beyond the Duration window so a non-responsive backend cannot block forever. streamDuration := time.Duration(s.metadata.Duration) * time.Second + // Hard deadline beyond the Duration window so a non-responsive backend cannot block forever. streamCtx, cancel := context.WithTimeout(ctx, streamDuration+splunkO11yStreamMargin) defer cancel() - stopTimer := time.After(streamDuration) + stopTimer := time.NewTimer(streamDuration) + defer stopTimer.Stop() maxValue := math.Inf(-1) minValue := math.Inf(1) valueSum := 0.0 valueCount := 0 + + process := func(msg *messages.DataMessage) error { + if len(msg.Payloads) == 0 { + s.logger.V(1).Info("No data retrieved.") + return nil + } + for _, pl := range msg.Payloads { + value, ok := pl.Value().(float64) + if !ok { + return fmt.Errorf("could not convert Splunk Observability metric value to float64") + } + s.logger.V(1).Info(fmt.Sprintf("Encountering value %.4f\n", value)) + maxValue = math.Max(maxValue, value) + minValue = math.Min(minValue, value) + valueSum += value + valueCount++ + } + return nil + } + s.logger.V(1).Info("Now iterating over results.") dataCh := comp.Data() @@ -137,30 +145,37 @@ loop: select { case <-streamCtx.Done(): s.logger.V(1).Info("Context done before stream completed; stopping computation.") - s.stopAndDrain(comp) + s.stopComputation(comp) return -1, fmt.Errorf("splunk observability query did not complete in time: %w", streamCtx.Err()) - case <-stopTimer: + case <-stopTimer.C: s.logger.V(1).Info("Stopping MTS stream after duration.") - s.stopAndDrain(comp) - break loop + s.stopComputation(comp) + // Process any remaining buffered messages until the channel closes or a grace period elapses. + grace := time.NewTimer(splunkO11yDrainTimeout) + for { + select { + case msg, ok := <-dataCh: + if !ok { + grace.Stop() + break loop + } + if err := process(msg); err != nil { + grace.Stop() + return -1, err + } + continue + case <-grace.C: + case <-streamCtx.Done(): + grace.Stop() + } + break loop + } case msg, ok := <-dataCh: if !ok { break loop } - if len(msg.Payloads) == 0 { - s.logger.V(1).Info("No data retrieved.") - continue - } - for _, pl := range msg.Payloads { - value, ok := pl.Value().(float64) - if !ok { - return -1, fmt.Errorf("could not convert Splunk Observability metric value to float64") - } - s.logger.V(1).Info(fmt.Sprintf("Encountering value %.4f\n", value)) - maxValue = math.Max(maxValue, value) - minValue = math.Min(minValue, value) - valueSum += value - valueCount++ + if err := process(msg); err != nil { + return -1, err } } } From 11baa0c7d976e3b452ccc211f2c473c75267cb54 Mon Sep 17 00:00:00 2001 From: Rick Brouwer Date: Fri, 29 May 2026 16:48:06 +0200 Subject: [PATCH 3/6] update after comment Signed-off-by: Rick Brouwer --- pkg/scalers/splunk_observability_scaler.go | 61 ++++++++++++++-------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/pkg/scalers/splunk_observability_scaler.go b/pkg/scalers/splunk_observability_scaler.go index 8f443e2e529..031a33c4e37 100644 --- a/pkg/scalers/splunk_observability_scaler.go +++ b/pkg/scalers/splunk_observability_scaler.go @@ -85,14 +85,43 @@ func NewSplunkObservabilityScaler(config *scalersconfig.ScalerConfig) (Scaler, e }, nil } -// stopComputation stops the computation, ignoring errors beyond debug logging. -func (s *splunkObservabilityScaler) stopComputation(comp *signalflow.Computation) { +// stopAndDrain stops the computation and keeps reading Data() until it closes or a +// short grace period elapses, so the SignalFlow client's goroutines are not left +// blocked on sends to an unconsumed channel. If process is non-nil, drained messages +// are passed to it; a process error ends the drain and is returned. +func (s *splunkObservabilityScaler) stopAndDrain(comp *signalflow.Computation, process func(*messages.DataMessage) error) error { stopCtx, cancel := context.WithTimeout(context.Background(), splunkO11yDrainTimeout) defer cancel() if err := comp.Stop(stopCtx); err != nil { s.logger.V(1).Info("Failed to stop SignalFlow computation", "error", err) } + + dataCh := comp.Data() + for { + // Give the deadline priority so a backend that keeps sending cannot extend the grace budget. + select { + case <-stopCtx.Done(): + s.logger.V(1).Info("Gave up draining SignalFlow data channel after stop") + return nil + default: + } + + select { + case msg, ok := <-dataCh: + if !ok { + return nil + } + if process != nil { + if err := process(msg); err != nil { + return err + } + } + case <-stopCtx.Done(): + s.logger.V(1).Info("Gave up draining SignalFlow data channel after stop") + return nil + } + } } func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64, error) { @@ -145,36 +174,22 @@ loop: select { case <-streamCtx.Done(): s.logger.V(1).Info("Context done before stream completed; stopping computation.") - s.stopComputation(comp) + // Drain without processing: the result is discarded on this error path. + _ = s.stopAndDrain(comp, nil) return -1, fmt.Errorf("splunk observability query did not complete in time: %w", streamCtx.Err()) case <-stopTimer.C: s.logger.V(1).Info("Stopping MTS stream after duration.") - s.stopComputation(comp) - // Process any remaining buffered messages until the channel closes or a grace period elapses. - grace := time.NewTimer(splunkO11yDrainTimeout) - for { - select { - case msg, ok := <-dataCh: - if !ok { - grace.Stop() - break loop - } - if err := process(msg); err != nil { - grace.Stop() - return -1, err - } - continue - case <-grace.C: - case <-streamCtx.Done(): - grace.Stop() - } - break loop + // Stop, but keep processing any remaining buffered messages for this window. + if err := s.stopAndDrain(comp, process); err != nil { + return -1, err } + break loop case msg, ok := <-dataCh: if !ok { break loop } if err := process(msg); err != nil { + _ = s.stopAndDrain(comp, nil) return -1, err } } From f51456fdff9bf80ce8a39acabdbf1c1c6a70631c Mon Sep 17 00:00:00 2001 From: Rick Brouwer Date: Fri, 29 May 2026 16:57:06 +0200 Subject: [PATCH 4/6] update after comment Signed-off-by: Rick Brouwer --- pkg/scalers/splunk_observability_scaler.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/pkg/scalers/splunk_observability_scaler.go b/pkg/scalers/splunk_observability_scaler.go index 031a33c4e37..edd58770158 100644 --- a/pkg/scalers/splunk_observability_scaler.go +++ b/pkg/scalers/splunk_observability_scaler.go @@ -169,14 +169,27 @@ func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64 s.logger.V(1).Info("Now iterating over results.") dataCh := comp.Data() + + // timedOut handles the hard-deadline path: stop, drain, and return the timeout error. + timedOut := func() (float64, error) { + s.logger.V(1).Info("Context done before stream completed; stopping computation.") + _ = s.stopAndDrain(comp, nil) + return -1, fmt.Errorf("splunk observability query did not complete in time: %w", streamCtx.Err()) + } + loop: for { + // Give the hard deadline priority: select has no fairness, so a continuously + // ready dataCh could otherwise starve the streamCtx.Done() case. + select { + case <-streamCtx.Done(): + return timedOut() + default: + } + select { case <-streamCtx.Done(): - s.logger.V(1).Info("Context done before stream completed; stopping computation.") - // Drain without processing: the result is discarded on this error path. - _ = s.stopAndDrain(comp, nil) - return -1, fmt.Errorf("splunk observability query did not complete in time: %w", streamCtx.Err()) + return timedOut() case <-stopTimer.C: s.logger.V(1).Info("Stopping MTS stream after duration.") // Stop, but keep processing any remaining buffered messages for this window. From 767f2b60d58fc799080ef247abf3418ac75a28ee Mon Sep 17 00:00:00 2001 From: Rick Brouwer Date: Fri, 29 May 2026 17:01:39 +0200 Subject: [PATCH 5/6] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Rick Brouwer --- pkg/scalers/splunk_observability_scaler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/scalers/splunk_observability_scaler.go b/pkg/scalers/splunk_observability_scaler.go index edd58770158..9a7a8473032 100644 --- a/pkg/scalers/splunk_observability_scaler.go +++ b/pkg/scalers/splunk_observability_scaler.go @@ -170,11 +170,12 @@ func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64 dataCh := comp.Data() + // timedOut handles the hard-deadline path: stop, drain, and return the timeout error. // timedOut handles the hard-deadline path: stop, drain, and return the timeout error. timedOut := func() (float64, error) { s.logger.V(1).Info("Context done before stream completed; stopping computation.") _ = s.stopAndDrain(comp, nil) - return -1, fmt.Errorf("splunk observability query did not complete in time: %w", streamCtx.Err()) + return -1, fmt.Errorf("splunk observability query ended before stream completed: %w", streamCtx.Err()) } loop: From 6f1d7005077b228d1e582abd9e494317e8775964 Mon Sep 17 00:00:00 2001 From: Rick Brouwer Date: Fri, 29 May 2026 17:07:53 +0200 Subject: [PATCH 6/6] Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Rick Brouwer --- pkg/scalers/splunk_observability_scaler.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/scalers/splunk_observability_scaler.go b/pkg/scalers/splunk_observability_scaler.go index 9a7a8473032..c322f96b5f3 100644 --- a/pkg/scalers/splunk_observability_scaler.go +++ b/pkg/scalers/splunk_observability_scaler.go @@ -170,11 +170,10 @@ func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64 dataCh := comp.Data() - // timedOut handles the hard-deadline path: stop, drain, and return the timeout error. // timedOut handles the hard-deadline path: stop, drain, and return the timeout error. timedOut := func() (float64, error) { s.logger.V(1).Info("Context done before stream completed; stopping computation.") - _ = s.stopAndDrain(comp, nil) + go func() { _ = s.stopAndDrain(comp, nil) }() return -1, fmt.Errorf("splunk observability query ended before stream completed: %w", streamCtx.Err()) }