Skip to content

Commit 3c75054

Browse files
committed
Querying the stream consumer's leader node when using a cluster for the accurate count
1 parent 682f7af commit 3c75054

File tree

3 files changed

+98
-46
lines changed

3 files changed

+98
-46
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
3636
- [v1.1.0](#v110)
3737
- [v1.0.0](#v100)
3838

39+
## Unreleased
40+
41+
- **JetStream:** JetStream scaler query the stream consumer's leader pod when clustered ([#2391](https://github.com/kedacore/keda/issues/2391))
42+
3943
### New
4044

4145
- **General**: Expand Prometheus metric with label "ScalerName" to distinguish different triggers. The scaleName is defined per Trigger.Name ([#3588](https://github.com/kedacore/keda/issues/3588))

pkg/scalers/nats_jetstream_scaler.go

+92-44
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"net/http"
9+
"net/url"
910
"strconv"
1011

1112
"github.com/go-logr/logr"
@@ -26,32 +27,40 @@ const (
2627
)
2728

2829
type natsJetStreamScaler struct {
29-
metricType v2.MetricTargetType
30-
stream *streamDetail
31-
metadata natsJetStreamMetadata
32-
httpClient *http.Client
33-
logger logr.Logger
30+
metricType v2.MetricTargetType
31+
stream *streamDetail
32+
metadata natsJetStreamMetadata
33+
httpClient *http.Client
34+
logger logr.Logger
35+
leaderStream *streamDetail
3436
}
3537

3638
type natsJetStreamMetadata struct {
37-
monitoringEndpoint string
3839
account string
3940
stream string
4041
consumer string
42+
leaderName string
43+
monitoringURL string
4144
lagThreshold int64
4245
activationLagThreshold int64
46+
clusterSize int
4347
scalerIndex int
4448
}
4549

4650
type jetStreamEndpointResponse struct {
47-
Accounts []accountDetail `json:"account_details"`
51+
Accounts []accountDetail `json:"account_details"`
52+
MetaCluster metaCluster `json:"meta_cluster"`
4853
}
4954

5055
type accountDetail struct {
5156
Name string `json:"name"`
5257
Streams []*streamDetail `json:"stream_detail"`
5358
}
5459

60+
type metaCluster struct {
61+
ClusterSize int `json:"cluster_size"`
62+
}
63+
5564
type streamDetail struct {
5665
Name string `json:"name"`
5766
Config streamConfig `json:"config"`
@@ -77,6 +86,11 @@ type consumerDetail struct {
7786
NumPending int `json:"num_pending"`
7887
Config consumerConfig `json:"config"`
7988
DeliveryStatus consumerDeliveryStatus `json:"delivery"`
89+
Cluster consumerCluster `json:"cluster"`
90+
}
91+
92+
type consumerCluster struct {
93+
Leader string `json:"leader"`
8094
}
8195

8296
type consumerConfig struct {
@@ -158,61 +172,106 @@ func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, er
158172
return meta, fmt.Errorf("useHTTPS parsing error %s", err.Error())
159173
}
160174
}
161-
meta.monitoringEndpoint = getNATSJetStreamEndpoint(useHTTPS, natsServerEndpoint, meta.account)
175+
meta.monitoringURL = getNATSJetStreamEndpoint(useHTTPS, natsServerEndpoint, meta.account)
162176

163177
return meta, nil
164178
}
165179

166180
func getNATSJetStreamEndpoint(useHTTPS bool, natsServerEndpoint string, account string) string {
167-
protocol := natsHTTPProtocol
181+
scheme := natsHTTPProtocol
168182
if useHTTPS {
169-
protocol = natsHTTPSProtocol
183+
scheme = natsHTTPSProtocol
170184
}
185+
return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", scheme, natsServerEndpoint, account)
186+
}
171187

172-
return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", protocol, natsServerEndpoint, account)
188+
func (s *natsJetStreamScaler) getNATSJetStreamLeaderEndpoint() (string, error) {
189+
jsURL, err := url.Parse(s.metadata.monitoringURL)
190+
if err != nil {
191+
s.logger.Error(err, "unable to parse monitoring URL to create leader URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
192+
return "", err
193+
}
194+
195+
return fmt.Sprintf("%s://%s.%s%s?%s", jsURL.Scheme, s.metadata.leaderName, jsURL.Host, jsURL.Path, jsURL.RawQuery), nil
173196
}
174197

175-
func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) {
176-
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil)
198+
func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context, natsJetStreamEndpoint string) error {
199+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamEndpoint, nil)
177200
if err != nil {
178-
return false, err
201+
return err
179202
}
180203

181204
resp, err := s.httpClient.Do(req)
182205
if err != nil {
183-
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
184-
return false, err
206+
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", natsJetStreamEndpoint)
207+
return err
185208
}
186209

187210
defer resp.Body.Close()
188211
var jsAccountResp jetStreamEndpointResponse
189212
if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil {
190-
s.logger.Error(err, "unable to decode JetStream account response")
191-
return false, err
213+
s.logger.Error(err, "unable to decode NATS JetStream account details")
214+
return err
192215
}
193216

217+
s.metadata.clusterSize = jsAccountResp.MetaCluster.ClusterSize
218+
194219
// Find and assign the stream that we are looking for.
195-
for _, account := range jsAccountResp.Accounts {
196-
if account.Name == s.metadata.account {
197-
for _, stream := range account.Streams {
220+
for _, jsAccount := range jsAccountResp.Accounts {
221+
if jsAccount.Name == s.metadata.account {
222+
for _, stream := range jsAccount.Streams {
198223
if stream.Name == s.metadata.stream {
199224
s.stream = stream
200225
}
226+
227+
for _, consumer := range stream.Consumers {
228+
if consumer.Name == s.metadata.consumer {
229+
s.metadata.leaderName = consumer.Cluster.Leader
230+
}
231+
}
201232
}
202233
}
203234
}
235+
236+
return nil
237+
}
238+
239+
func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) {
240+
err := s.getNATSJetstreamMonitoringData(ctx, s.metadata.monitoringURL)
241+
if err != nil {
242+
return false, err
243+
}
244+
245+
// Query the consumer leader pod, it has the accurate count.
246+
if s.metadata.clusterSize > 1 {
247+
monitoringLeaderEndpoint, err := s.getNATSJetStreamLeaderEndpoint()
248+
if err != nil {
249+
return false, err
250+
}
251+
252+
err = s.getNATSJetstreamMonitoringData(ctx, monitoringLeaderEndpoint)
253+
if err != nil {
254+
return false, err
255+
}
256+
}
257+
204258
return s.getMaxMsgLag() > s.metadata.activationLagThreshold, nil
205259
}
206260

207261
func (s *natsJetStreamScaler) getMaxMsgLag() int64 {
208262
consumerName := s.metadata.consumer
209263

210-
for _, consumer := range s.stream.Consumers {
264+
stream := s.stream
265+
if s.leaderStream != nil {
266+
stream = s.leaderStream
267+
}
268+
269+
for _, consumer := range stream.Consumers {
211270
if consumer.Name == consumerName {
212271
return int64(consumer.NumPending + consumer.NumAckPending)
213272
}
214273
}
215-
return s.stream.State.LastSequence
274+
return stream.State.LastSequence
216275
}
217276

218277
func (s *natsJetStreamScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
@@ -230,32 +289,21 @@ func (s *natsJetStreamScaler) GetMetricSpecForScaling(context.Context) []v2.Metr
230289
}
231290

232291
func (s *natsJetStreamScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
233-
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil)
234-
if err != nil {
235-
return nil, err
236-
}
237-
238-
resp, err := s.httpClient.Do(req)
292+
err := s.getNATSJetstreamMonitoringData(ctx, s.metadata.monitoringURL)
239293
if err != nil {
240-
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
241294
return []external_metrics.ExternalMetricValue{}, err
242295
}
243296

244-
defer resp.Body.Close()
245-
var jsAccountResp jetStreamEndpointResponse
246-
if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil {
247-
s.logger.Error(err, "unable to decode JetStream account details")
248-
return []external_metrics.ExternalMetricValue{}, err
249-
}
297+
// Query the consumer leader pod, it has the accurate count.
298+
if s.metadata.clusterSize > 1 {
299+
monitoringLeaderEndpoint, err := s.getNATSJetStreamLeaderEndpoint()
300+
if err != nil {
301+
return []external_metrics.ExternalMetricValue{}, err
302+
}
250303

251-
// Find and assign the stream that we are looking for.
252-
for _, account := range jsAccountResp.Accounts {
253-
if account.Name == s.metadata.account {
254-
for _, stream := range account.Streams {
255-
if stream.Name == s.metadata.stream {
256-
s.stream = stream
257-
}
258-
}
304+
err = s.getNATSJetstreamMonitoringData(ctx, monitoringLeaderEndpoint)
305+
if err != nil {
306+
return []external_metrics.ExternalMetricValue{}, err
259307
}
260308
}
261309

pkg/scalers/nats_jetstream_scaler_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,13 @@ func TestNATSJetStreamGetMetricSpecForScaling(t *testing.T) {
6767
if err != nil {
6868
t.Fatal("Could not parse metadata:", err)
6969
}
70-
mockStanScaler := natsJetStreamScaler{
70+
mockJetStreamScaler := natsJetStreamScaler{
7171
stream: nil,
7272
metadata: meta,
7373
httpClient: http.DefaultClient,
7474
}
7575

76-
metricSpec := mockStanScaler.GetMetricSpecForScaling(ctx)
76+
metricSpec := mockJetStreamScaler.GetMetricSpecForScaling(ctx)
7777
metricName := metricSpec[0].External.Metric.Name
7878
if metricName != testData.name {
7979
t.Error("Wrong External metric source name:", metricName)

0 commit comments

Comments
 (0)