Skip to content

Commit

Permalink
feat: expose maxInflight for stan config (#117)
Browse files Browse the repository at this point in the history
* feat: expose maxInflight for stan config

Signed-off-by: Derek Wang <[email protected]>

* ok

Signed-off-by: Derek Wang <[email protected]>

* fix

Signed-off-by: Derek Wang <[email protected]>

* error

Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Jul 16, 2021
1 parent 51c21ad commit 4398de4
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 110 deletions.
143 changes: 64 additions & 79 deletions CHANGELOG.md

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions api/v1alpha1/stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type STAN struct {
Subject string `json:"subject" protobuf:"bytes,3,opt,name=subject"`
SubjectPrefix SubjectPrefix `json:"subjectPrefix,omitempty" protobuf:"bytes,6,opt,name=subjectPrefix,casttype=SubjectPrefix"`
Auth *STANAuth `json:"auth,omitempty" protobuf:"bytes,7,opt,name=auth"`
// Max inflight messages when subscribing to the stan server, which means how many messages
// between commits, therefore potential duplicates during disruption
// +kubebuilder:default=20
MaxInflight uint32 `json:"maxInflight,omitempty" protobuf:"bytes,9,opt,name=maxInflight"`
}

type STANAuth struct {
Expand All @@ -34,3 +38,10 @@ func (s *STAN) AuthStrategy() STANAuthStrategy {
}
return STANAuthNone
}

func (s *STAN) GetMaxInflight() int {
if s.MaxInflight < 1 {
return CommitN
}
return int(s.MaxInflight)
}
21 changes: 21 additions & 0 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -935,6 +940,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -2793,6 +2803,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -2896,6 +2911,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -4370,6 +4390,7 @@ metadata:
stringData:
authToken: testingtokentestingtoken
clusterId: stan
maxInflight: "15"
natsMonitoringUrl: http://stan:8222
natsUrl: nats
subjectPrefix: NamespacedPipelineName
Expand Down
16 changes: 16 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,14 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing
to the stan server, which means how many messages
between commits, therefore potential duplicates
during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -1332,6 +1340,14 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing
to the stan server, which means how many messages
between commits, therefore potential duplicates
during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down
14 changes: 14 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,13 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the
stan server, which means how many messages between commits,
therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -1286,6 +1293,13 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the
stan server, which means how many messages between commits,
therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down
20 changes: 20 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -935,6 +940,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -2793,6 +2803,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -2896,6 +2911,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down
21 changes: 21 additions & 0 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -935,6 +940,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -2793,6 +2803,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -2896,6 +2911,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -4370,6 +4390,7 @@ metadata:
stringData:
authToken: testingtokentestingtoken
clusterId: stan
maxInflight: "15"
natsMonitoringUrl: http://stan:8222
natsUrl: nats
subjectPrefix: NamespacedPipelineName
Expand Down
20 changes: 20 additions & 0 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -935,6 +940,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -2793,6 +2803,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down Expand Up @@ -2896,6 +2911,11 @@ spec:
type: object
clusterId:
type: string
maxInflight:
default: 20
description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption
format: int32
type: integer
name:
default: default
type: string
Expand Down
1 change: 1 addition & 0 deletions examples/dataflow-stan-default-secret.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ stringData:
natsMonitoringUrl: http://stan:8222
subjectPrefix: NamespacedPipelineName
authToken: testingtokentestingtoken
maxInflight: "15"
23 changes: 12 additions & 11 deletions runner/sidecar/sinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,23 +152,24 @@ func connectSTANSink(ctx context.Context, sinkName string, x *dfv1.STAN) (func(m
go func() {
defer runtimeutil.HandleCrash()
logger.Info("starting stan auto reconnection daemon", "sink", sinkName)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
time.Sleep(5 * time.Second)
select {
case <-ctx.Done():
logger.Info("exiting stan auto reconnection daemon", "sink", sinkName)
return
default:
}
if conn == nil || conn.IsClosed() {
logger.Info("stan connection lost, reconnecting...", "sink", sinkName)
clientID := genClientID()
conn, err = ConnectSTAN(ctx, x, clientID)
if err != nil {
logger.Error(err, "failed to reconnect", "sink", sinkName, "clientID", clientID)
continue
case <-ticker.C:
if conn == nil || conn.IsClosed() {
logger.Info("stan connection lost, reconnecting...", "sink", sinkName)
clientID := genClientID()
conn, err = ConnectSTAN(ctx, x, clientID)
if err != nil {
logger.Error(err, "failed to reconnect", "sink", sinkName, "clientID", clientID)
continue
}
logger.Info("reconnected to stan server.", "sink", sinkName, "clientID", clientID)
}
logger.Info("reconnected to stan server.", "sink", sinkName, "clientID", clientID)
}
}
}()
Expand Down
37 changes: 19 additions & 18 deletions runner/sidecar/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func connectSTANSource(ctx context.Context, sourceName string, x *dfv1.STAN, f f
stan.SetManualAckMode(),
stan.StartAt(pb.StartPosition_NewOnly),
stan.AckWait(30*time.Second),
stan.MaxInflight(dfv1.CommitN))
stan.MaxInflight(x.GetMaxInflight()))
if err != nil {
return nil, fmt.Errorf("failed to subscribe: %w", err)
}
Expand All @@ -265,28 +265,29 @@ func connectSTANSource(ctx context.Context, sourceName string, x *dfv1.STAN, f f
go func() {
defer runtimeutil.HandleCrash()
logger.Info("starting stan auto reconnection daemon", "source", sourceName)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
time.Sleep(5 * time.Second)
select {
case <-ctx.Done():
logger.Info("exiting stan auto reconnection daemon", "source", sourceName)
return
default:
}
if conn == nil || conn.IsClosed() {
_ = sub.Close()
logger.Info("stan connection lost, reconnecting...", "source", sourceName)
clientID := genClientID()
conn, err = ConnectSTAN(ctx, x, clientID)
if err != nil {
logger.Error(err, "failed to reconnect", "source", sourceName, "clientID", clientID)
continue
}
logger.Info("reconnected to stan server.", "source", sourceName, "clientID", clientID)
if sub, err = subFunc(); err != nil {
logger.Error(err, "failed to subscribe after reconnection", "source", sourceName, "clientID", clientID)
// Close the connection to let it retry
_ = conn.Close()
case <-ticker.C:
if conn == nil || conn.IsClosed() {
_ = sub.Close()
logger.Info("stan connection lost, reconnecting...", "source", sourceName)
clientID := genClientID()
conn, err = ConnectSTAN(ctx, x, clientID)
if err != nil {
logger.Error(err, "failed to reconnect", "source", sourceName, "clientID", clientID)
continue
}
logger.Info("reconnected to stan server.", "source", sourceName, "clientID", clientID)
if sub, err = subFunc(); err != nil {
logger.Error(err, "failed to subscribe after reconnection", "source", sourceName, "clientID", clientID)
// Close the connection to let it retry
_ = conn.Close()
}
}
}
}
Expand Down
Loading

0 comments on commit 4398de4

Please sign in to comment.