Skip to content

Commit 7ed83a5

Browse files
committed
Add support for JetStream scaler to query the stream consumer leader when clustered
Signed-off-by: Ray <[email protected]>
1 parent a489ca7 commit 7ed83a5

File tree

8 files changed

+1218
-483
lines changed

8 files changed

+1218
-483
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
6666
- **Azure Pipelines Scaler:** Improved speed of profiling large set of Job Requests from Azure Pipelines ([#3702](https://github.com/kedacore/keda/issues/3702))
6767
- **GCP Storage Scaler:** Add prefix and delimiter support ([#3756](https://github.com/kedacore/keda/issues/3756))
6868
- **Metrics API Scaler:** Add unsafeSsl paramater to skip certificate validation when connecting over HTTPS ([#3728](https://github.com/kedacore/keda/discussions/3728))
69+
- **NATS Jetstream Scaler:** Query the stream consumer leader when clustered ([#3860](https://github.com/kedacore/keda/issues/3860))
6970
- **NATS Scalers:** Support HTTPS protocol in NATS Scalers ([#3805](https://github.com/kedacore/keda/issues/3805))
7071
- **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310))
7172
- **Pulsar Scaler:** Add support for bearer token and basic auth ([#3844](https://github.com/kedacore/keda/issues/3844))

pkg/scalers/nats_jetstream_scaler.go

+214-55
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"errors"
77
"fmt"
88
"net/http"
9+
"net/url"
910
"strconv"
11+
"strings"
1012

1113
"github.com/go-logr/logr"
1214
v2 "k8s.io/api/autoscaling/v2"
@@ -18,10 +20,11 @@ import (
1820
)
1921

2022
const (
21-
jetStreamMetricType = "External"
22-
defaultJetStreamLagThreshold = 10
23-
natsHTTPProtocol = "http"
24-
natsHTTPSProtocol = "https"
23+
jetStreamMetricType = "External"
24+
defaultJetStreamLagThreshold = 10
25+
natsHTTPProtocol = "http"
26+
natsHTTPSProtocol = "https"
27+
jetStreamLagThresholdMetricName = "lagThreshold"
2528
)
2629

2730
type natsJetStreamScaler struct {
@@ -33,24 +36,41 @@ type natsJetStreamScaler struct {
3336
}
3437

3538
type natsJetStreamMetadata struct {
36-
monitoringEndpoint string
3739
account string
3840
stream string
3941
consumer string
42+
consumerLeader string
43+
monitoringURL string
44+
monitoringLeaderURL string
4045
lagThreshold int64
4146
activationLagThreshold int64
47+
clusterSize int
4248
scalerIndex int
4349
}
4450

4551
type jetStreamEndpointResponse struct {
46-
Accounts []accountDetail `json:"account_details"`
52+
Accounts []accountDetail `json:"account_details"`
53+
MetaCluster metaCluster `json:"meta_cluster"`
54+
}
55+
56+
type jetStreamServerEndpointResponse struct {
57+
Cluster jetStreamCluster `json:"cluster"`
58+
ServerName string `json:"server_name"`
59+
}
60+
61+
type jetStreamCluster struct {
62+
HostUrls []string `json:"urls"`
4763
}
4864

4965
type accountDetail struct {
5066
Name string `json:"name"`
5167
Streams []*streamDetail `json:"stream_detail"`
5268
}
5369

70+
type metaCluster struct {
71+
ClusterSize int `json:"cluster_size"`
72+
}
73+
5474
type streamDetail struct {
5575
Name string `json:"name"`
5676
Config streamConfig `json:"config"`
@@ -76,6 +96,11 @@ type consumerDetail struct {
7696
NumPending int `json:"num_pending"`
7797
Config consumerConfig `json:"config"`
7898
DeliveryStatus consumerDeliveryStatus `json:"delivery"`
99+
Cluster consumerCluster `json:"cluster"`
100+
}
101+
102+
type consumerCluster struct {
103+
Leader string `json:"leader"`
79104
}
80105

81106
type consumerConfig struct {
@@ -127,11 +152,12 @@ func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, er
127152

128153
meta.lagThreshold = defaultJetStreamLagThreshold
129154

130-
if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok {
155+
if val, ok := config.TriggerMetadata[jetStreamLagThresholdMetricName]; ok {
131156
t, err := strconv.ParseInt(val, 10, 64)
132157
if err != nil {
133-
return meta, fmt.Errorf("error parsing %s: %s", lagThresholdMetricName, err)
158+
return meta, fmt.Errorf("error parsing %s: %s", jetStreamLagThresholdMetricName, err)
134159
}
160+
135161
meta.lagThreshold = t
136162
}
137163

@@ -157,49 +183,202 @@ func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, er
157183
return meta, fmt.Errorf("useHTTPS parsing error %s", err.Error())
158184
}
159185
}
160-
meta.monitoringEndpoint = getNATSJetStreamEndpoint(useHTTPS, natsServerEndpoint, meta.account)
186+
meta.monitoringURL = getNATSJetStreamMonitoringURL(useHTTPS, natsServerEndpoint, meta.account)
161187

162188
return meta, nil
163189
}
164190

165-
func getNATSJetStreamEndpoint(useHTTPS bool, natsServerEndpoint string, account string) string {
166-
protocol := natsHTTPProtocol
167-
if useHTTPS {
168-
protocol = natsHTTPSProtocol
191+
func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context, natsJetStreamMonitoringURL string) error {
192+
// save the leader URL, then we can check if it has changed
193+
cachedConsumerLeader := s.metadata.consumerLeader
194+
// default URL (standalone)
195+
monitoringURL := natsJetStreamMonitoringURL
196+
// use the leader URL if we already have it
197+
if s.metadata.monitoringLeaderURL != "" {
198+
monitoringURL = s.metadata.monitoringLeaderURL
199+
}
200+
201+
jetStreamAccountResp, err := s.getNATSJetstreamMonitoringRequest(ctx, monitoringURL)
202+
if err != nil {
203+
return err
169204
}
170205

171-
return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", protocol, natsServerEndpoint, account)
206+
consumerFound := s.setNATSJetStreamMonitoringData(jetStreamAccountResp, "")
207+
208+
// invalidate the cached data if we used it but nothing was found
209+
if cachedConsumerLeader != "" && !consumerFound {
210+
s.invalidateNATSJetStreamCachedMonitoringData()
211+
}
212+
213+
// the leader name hasn't changed from the previous run, we can assume we just queried the correct leader node
214+
if consumerFound && cachedConsumerLeader != "" && cachedConsumerLeader == s.metadata.consumerLeader {
215+
return nil
216+
}
217+
218+
if s.metadata.clusterSize > 1 {
219+
// we know who the consumer leader is, query it directly
220+
if s.metadata.consumerLeader != "" {
221+
natsJetStreamMonitoringLeaderURL, err := s.getNATSJetStreamMonitoringNodeURL(s.metadata.consumerLeader)
222+
if err != nil {
223+
return err
224+
}
225+
226+
jetStreamAccountResp, err = s.getNATSJetstreamMonitoringRequest(ctx, natsJetStreamMonitoringLeaderURL)
227+
if err != nil {
228+
return err
229+
}
230+
231+
s.setNATSJetStreamMonitoringData(jetStreamAccountResp, natsJetStreamMonitoringLeaderURL)
232+
return nil
233+
}
234+
235+
// we haven't found the consumer yet, grab the list of hosts and try each one
236+
natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL()
237+
if err != nil {
238+
return err
239+
}
240+
241+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringServerURL, nil)
242+
if err != nil {
243+
return err
244+
}
245+
246+
resp, err := s.httpClient.Do(req)
247+
if err != nil {
248+
s.logger.Error(err, "unable to access NATS JetStream monitoring server endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringServerURL)
249+
return err
250+
}
251+
252+
defer resp.Body.Close()
253+
var jetStreamServerResp *jetStreamServerEndpointResponse
254+
if err = json.NewDecoder(resp.Body).Decode(&jetStreamServerResp); err != nil {
255+
s.logger.Error(err, "unable to decode NATS JetStream server details")
256+
return err
257+
}
258+
259+
for _, clusterURL := range jetStreamServerResp.Cluster.HostUrls {
260+
node := strings.Split(clusterURL, ".")[0]
261+
natsJetStreamMonitoringNodeURL, err := s.getNATSJetStreamMonitoringNodeURL(node)
262+
if err != nil {
263+
return err
264+
}
265+
266+
jetStreamAccountResp, err = s.getNATSJetstreamMonitoringRequest(ctx, natsJetStreamMonitoringNodeURL)
267+
if err != nil {
268+
return err
269+
}
270+
271+
for _, jetStreamAccount := range jetStreamAccountResp.Accounts {
272+
if jetStreamAccount.Name == s.metadata.account {
273+
for _, stream := range jetStreamAccount.Streams {
274+
if stream.Name == s.metadata.stream {
275+
for _, consumer := range stream.Consumers {
276+
if consumer.Name == s.metadata.consumer {
277+
// this node is the consumer leader
278+
if node == consumer.Cluster.Leader {
279+
s.setNATSJetStreamMonitoringData(jetStreamAccountResp, natsJetStreamMonitoringNodeURL)
280+
return nil
281+
}
282+
}
283+
}
284+
}
285+
}
286+
}
287+
}
288+
}
289+
}
290+
return nil
172291
}
173292

174-
func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) {
175-
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil)
293+
func (s *natsJetStreamScaler) setNATSJetStreamMonitoringData(jetStreamAccountResp *jetStreamEndpointResponse, leaderURL string) bool {
294+
s.metadata.clusterSize = jetStreamAccountResp.MetaCluster.ClusterSize
295+
296+
// find and assign the stream that we are looking for.
297+
for _, jsAccount := range jetStreamAccountResp.Accounts {
298+
if jsAccount.Name == s.metadata.account {
299+
for _, stream := range jsAccount.Streams {
300+
if stream.Name == s.metadata.stream {
301+
s.stream = stream
302+
303+
for _, consumer := range stream.Consumers {
304+
if consumer.Name == s.metadata.consumer {
305+
s.metadata.consumerLeader = consumer.Cluster.Leader
306+
if leaderURL != "" {
307+
s.metadata.monitoringLeaderURL = leaderURL
308+
}
309+
return true
310+
}
311+
}
312+
}
313+
}
314+
}
315+
}
316+
return false
317+
}
318+
319+
func (s *natsJetStreamScaler) invalidateNATSJetStreamCachedMonitoringData() {
320+
s.metadata.consumerLeader = ""
321+
s.metadata.monitoringLeaderURL = ""
322+
s.stream = nil
323+
}
324+
325+
func (s *natsJetStreamScaler) getNATSJetstreamMonitoringRequest(ctx context.Context, natsJetStreamMonitoringURL string) (*jetStreamEndpointResponse, error) {
326+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringURL, nil)
176327
if err != nil {
177-
return false, err
328+
return nil, err
178329
}
179330

180331
resp, err := s.httpClient.Do(req)
181332
if err != nil {
182-
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
183-
return false, err
333+
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringURL)
334+
return nil, err
184335
}
185336

186337
defer resp.Body.Close()
187-
var jsAccountResp jetStreamEndpointResponse
338+
var jsAccountResp *jetStreamEndpointResponse
188339
if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil {
189-
s.logger.Error(err, "unable to decode JetStream account response")
340+
s.logger.Error(err, "unable to decode NATS JetStream account details")
341+
return nil, err
342+
}
343+
return jsAccountResp, nil
344+
}
345+
346+
func getNATSJetStreamMonitoringURL(useHTTPS bool, natsServerEndpoint string, account string) string {
347+
scheme := natsHTTPProtocol
348+
if useHTTPS {
349+
scheme = natsHTTPSProtocol
350+
}
351+
return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", scheme, natsServerEndpoint, account)
352+
}
353+
354+
func (s *natsJetStreamScaler) getNATSJetStreamMonitoringServerURL() (string, error) {
355+
jsURL, err := url.Parse(s.metadata.monitoringURL)
356+
if err != nil {
357+
s.logger.Error(err, "unable to parse monitoring URL to create server URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
358+
return "", err
359+
}
360+
return fmt.Sprintf("%s://%s/varz", jsURL.Scheme, jsURL.Host), nil
361+
}
362+
363+
func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURL(node string) (string, error) {
364+
jsURL, err := url.Parse(s.metadata.monitoringURL)
365+
if err != nil {
366+
s.logger.Error(err, "unable to parse monitoring URL to create node URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
367+
return "", err
368+
}
369+
return fmt.Sprintf("%s://%s.%s%s?%s", jsURL.Scheme, node, jsURL.Host, jsURL.Path, jsURL.RawQuery), nil
370+
}
371+
372+
func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) {
373+
err := s.getNATSJetstreamMonitoringData(ctx, s.metadata.monitoringURL)
374+
if err != nil {
190375
return false, err
191376
}
192377

193-
// Find and assign the stream that we are looking for.
194-
for _, account := range jsAccountResp.Accounts {
195-
if account.Name == s.metadata.account {
196-
for _, stream := range account.Streams {
197-
if stream.Name == s.metadata.stream {
198-
s.stream = stream
199-
}
200-
}
201-
}
378+
if s.stream == nil {
379+
return false, errors.New("stream not found")
202380
}
381+
203382
return s.getMaxMsgLag() > s.metadata.activationLagThreshold, nil
204383
}
205384

@@ -223,39 +402,20 @@ func (s *natsJetStreamScaler) GetMetricSpecForScaling(context.Context) []v2.Metr
223402
Target: GetMetricTarget(s.metricType, s.metadata.lagThreshold),
224403
}
225404
metricSpec := v2.MetricSpec{
226-
External: externalMetric, Type: jetStreamMetricType,
405+
External: externalMetric,
406+
Type: jetStreamMetricType,
227407
}
228408
return []v2.MetricSpec{metricSpec}
229409
}
230410

231411
func (s *natsJetStreamScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) {
232-
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil)
412+
err := s.getNATSJetstreamMonitoringData(ctx, s.metadata.monitoringURL)
233413
if err != nil {
234-
return nil, err
235-
}
236-
237-
resp, err := s.httpClient.Do(req)
238-
if err != nil {
239-
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
240-
return []external_metrics.ExternalMetricValue{}, err
241-
}
242-
243-
defer resp.Body.Close()
244-
var jsAccountResp jetStreamEndpointResponse
245-
if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil {
246-
s.logger.Error(err, "unable to decode JetStream account details")
247414
return []external_metrics.ExternalMetricValue{}, err
248415
}
249416

250-
// Find and assign the stream that we are looking for.
251-
for _, account := range jsAccountResp.Accounts {
252-
if account.Name == s.metadata.account {
253-
for _, stream := range account.Streams {
254-
if stream.Name == s.metadata.stream {
255-
s.stream = stream
256-
}
257-
}
258-
}
417+
if s.stream == nil {
418+
return []external_metrics.ExternalMetricValue{}, errors.New("stream not found")
259419
}
260420

261421
totalLag := s.getMaxMsgLag()
@@ -266,7 +426,6 @@ func (s *natsJetStreamScaler) GetMetrics(ctx context.Context, metricName string)
266426
Value: *resource.NewQuantity(totalLag, resource.DecimalSI),
267427
Timestamp: metav1.Now(),
268428
}
269-
270429
return append([]external_metrics.ExternalMetricValue{}, metric), nil
271430
}
272431

0 commit comments

Comments
 (0)