Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Redis Scaler**: Use literal command names in Lua script to fix compatibility with Alibaba Cloud Redis Cluster ([#7758](https://github.com/kedacore/keda/issues/7758))
- **Solace Scaler**: Fix URL escaping for Message VPN and Queue names ([#7481](https://github.com/kedacore/keda/pull/7481))
- **Solr Scaler**: Use net/url to safely encode query parameters ([#7467](https://github.com/kedacore/keda/pull/7467))
- **Splunk Observability Scaler**: Add MTS stream handling with context timeout ([#7799](https://github.com/kedacore/keda/pull/7799))

### Deprecations

Expand Down
113 changes: 101 additions & 12 deletions pkg/scalers/splunk_observability_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@ import (

"github.com/go-logr/logr"
"github.com/signalfx/signalflow-client-go/v2/signalflow"
"github.com/signalfx/signalflow-client-go/v2/signalflow/messages"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// splunkO11yStreamMargin bounds the stream read beyond the configured Duration.
const splunkO11yStreamMargin = 10 * time.Second

// splunkO11yDrainTimeout is a short best-effort budget for draining after Stop.
const splunkO11yDrainTimeout = 2 * time.Second

type splunkObservabilityMetadata struct {
TriggerIndex int

Expand Down Expand Up @@ -78,6 +85,45 @@ func NewSplunkObservabilityScaler(config *scalersconfig.ScalerConfig) (Scaler, e
}, nil
}

// stopAndDrain stops the computation and keeps reading Data() until it closes or a
// short grace period elapses, so the SignalFlow client's goroutines are not left
// blocked on sends to an unconsumed channel. If process is non-nil, drained messages
// are passed to it; a process error ends the drain and is returned.
func (s *splunkObservabilityScaler) stopAndDrain(comp *signalflow.Computation, process func(*messages.DataMessage) error) error {
stopCtx, cancel := context.WithTimeout(context.Background(), splunkO11yDrainTimeout)
defer cancel()

if err := comp.Stop(stopCtx); err != nil {
s.logger.V(1).Info("Failed to stop SignalFlow computation", "error", err)
}

dataCh := comp.Data()
for {
// Give the deadline priority so a backend that keeps sending cannot extend the grace budget.
select {
case <-stopCtx.Done():
s.logger.V(1).Info("Gave up draining SignalFlow data channel after stop")
return nil
default:
}

select {
case msg, ok := <-dataCh:
if !ok {
return nil
}
if process != nil {
if err := process(msg); err != nil {
return err
}
}
case <-stopCtx.Done():
s.logger.V(1).Info("Gave up draining SignalFlow data channel after stop")
return nil
}
}
}

func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64, error) {
comp, err := s.apiClient.Execute(ctx, &signalflow.ExecuteRequest{
Program: s.metadata.Query,
Expand All @@ -88,36 +134,79 @@ func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64

s.logger.V(1).Info("Started MTS stream.")

stopTimer := time.After(time.Duration(s.metadata.Duration) * time.Second)
go func() {
<-stopTimer
s.logger.V(1).Info("Stopping MTS stream after duration.")
if err := comp.Stop(ctx); err != nil {
s.logger.Error(err, "Failed to stop SignalFlow computation")
}
}()
streamDuration := time.Duration(s.metadata.Duration) * time.Second
// Hard deadline beyond the Duration window so a non-responsive backend cannot block forever.
streamCtx, cancel := context.WithTimeout(ctx, streamDuration+splunkO11yStreamMargin)
defer cancel()

stopTimer := time.NewTimer(streamDuration)
defer stopTimer.Stop()

maxValue := math.Inf(-1)
minValue := math.Inf(1)
valueSum := 0.0
valueCount := 0
s.logger.V(1).Info("Now iterating over results.")
for msg := range comp.Data() {

process := func(msg *messages.DataMessage) error {
if len(msg.Payloads) == 0 {
s.logger.V(1).Info("No data retrieved.")
continue
return nil
}
for _, pl := range msg.Payloads {
value, ok := pl.Value().(float64)
if !ok {
return -1, fmt.Errorf("could not convert Splunk Observability metric value to float64")
return fmt.Errorf("could not convert Splunk Observability metric value to float64")
}
s.logger.V(1).Info(fmt.Sprintf("Encountering value %.4f\n", value))
Comment thread
rickbrouwer marked this conversation as resolved.
maxValue = math.Max(maxValue, value)
minValue = math.Min(minValue, value)
valueSum += value
valueCount++
}
return nil
}

s.logger.V(1).Info("Now iterating over results.")

dataCh := comp.Data()

// timedOut handles the hard-deadline path: stop, drain, and return the timeout error.
// timedOut handles the hard-deadline path: stop, drain, and return the timeout error.
Comment thread
rickbrouwer marked this conversation as resolved.
Outdated
timedOut := func() (float64, error) {
s.logger.V(1).Info("Context done before stream completed; stopping computation.")
_ = s.stopAndDrain(comp, nil)
return -1, fmt.Errorf("splunk observability query ended before stream completed: %w", streamCtx.Err())
}
Comment thread
rickbrouwer marked this conversation as resolved.
Comment thread
rickbrouwer marked this conversation as resolved.

loop:
for {
// Give the hard deadline priority: select has no fairness, so a continuously
// ready dataCh could otherwise starve the streamCtx.Done() case.
select {
case <-streamCtx.Done():
return timedOut()
default:
}

select {
case <-streamCtx.Done():
return timedOut()
case <-stopTimer.C:
Comment thread
rickbrouwer marked this conversation as resolved.
s.logger.V(1).Info("Stopping MTS stream after duration.")
// Stop, but keep processing any remaining buffered messages for this window.
if err := s.stopAndDrain(comp, process); err != nil {
return -1, err
}
break loop
case msg, ok := <-dataCh:
if !ok {
break loop
}
if err := process(msg); err != nil {
_ = s.stopAndDrain(comp, nil)
return -1, err
}
Comment thread
rickbrouwer marked this conversation as resolved.
}
}
Comment thread
rickbrouwer marked this conversation as resolved.

if valueCount == 0 {
Expand Down
60 changes: 60 additions & 0 deletions pkg/scalers/splunk_observability_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package scalers
import (
"context"
"testing"
"time"

"github.com/go-logr/logr"
"github.com/signalfx/signalflow-client-go/v2/signalflow"
"github.com/signalfx/signalfx-go/idtool"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
)
Expand Down Expand Up @@ -94,3 +99,58 @@ func TestSplunkObservabilityGetMetricSpecForScaling(t *testing.T) {
}
}
}

// newFakeSplunkO11yScaler wires a scaler to a fake backend that streams indefinitely without closing.
func newFakeSplunkO11yScaler(t *testing.T, program string, duration int) (*splunkObservabilityScaler, func()) {
t.Helper()

fake := signalflow.NewRunningFakeBackend()
client, err := fake.Client()
if err != nil {
fake.Stop()
t.Fatal("could not create fake backend client:", err)
}

tsid := idtool.ID(1)
fake.AddProgramTSIDs(program, []idtool.ID{tsid})
fake.SetTSIDFloatData(tsid, 42.0)

scaler := &splunkObservabilityScaler{
metadata: &splunkObservabilityMetadata{
Query: program,
Duration: duration,
QueryAggregator: "max",
},
apiClient: client,
logger: logr.Discard(),
}

return scaler, fake.Stop
}

// Regression guard: a stuck stream must not block getQueryResult past the parent context deadline.
func TestSplunkObservabilityGetQueryResultReturnsOnParentContextCancel(t *testing.T) {
const program = "data('demo.trans.latency').max().publish()"
// Large duration so the stopTimer never fires; the parent deadline must bound the call.
scaler, stop := newFakeSplunkO11yScaler(t, program, 3600)
defer stop()

ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()

done := make(chan struct{})
start := time.Now()
go func() {
defer close(done)
_, _ = scaler.getQueryResult(ctx)
}()

select {
case <-done:
if elapsed := time.Since(start); elapsed > 5*time.Second {
t.Fatalf("getQueryResult returned after %v, far longer than the context deadline", elapsed)
}
case <-time.After(10 * time.Second):
t.Fatal("getQueryResult did not return after parent context was cancelled; it is hanging")
}
}
Loading