Skip to content

Commit

Permalink
fix: nats disconnect log (#206)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Aug 6, 2021
1 parent c52ff23 commit 616753f
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 6 deletions.
13 changes: 11 additions & 2 deletions runner/sidecar/shared/stan/stan_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stan
import (
"context"
"fmt"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/shared/util"
"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -56,7 +57,11 @@ func ConnectSTAN(ctx context.Context, secretInterface corev1.SecretInterface, x
nats.NoReconnect(),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
conn.natsConnected = false
logger.Error(err, "nats connection lost")
if err != nil {
logger.Error(err, "nats connection lost")
} else {
logger.Info("nats disconnected")
}
}),
nats.ReconnectHandler(func(nnc *nats.Conn) {
conn.natsConnected = true
Expand Down Expand Up @@ -84,7 +89,11 @@ func ConnectSTAN(ctx context.Context, secretInterface corev1.SecretInterface, x
sc, err := stan.Connect(x.ClusterID, clientID, stan.NatsConn(nc), stan.Pings(5, 60),
stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
conn.stanConnected = false
logger.Error(err, "stan connection lost", "clientID", clientID)
if reason != nil {
logger.Error(reason, "stan connection lost", "clientID", clientID)
} else {
logger.Info("stan disconnected", "clientID", clientID)
}
}))
if err != nil {
return nil, fmt.Errorf("failed to connect to stan url=%s clusterID=%s clientID=%s subject=%s: %w", x.NATSURL, x.ClusterID, clientID, x.Subject, err)
Expand Down
7 changes: 6 additions & 1 deletion runner/sidecar/sink/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"database/sql"
"fmt"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"time"

corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/antonmedv/expr"
"github.com/antonmedv/expr/vm"
dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
Expand Down Expand Up @@ -139,3 +140,7 @@ func getDataSource(ctx context.Context, secretInterface corev1.SecretInterface,
}
return "", fmt.Errorf("invalid data source config")
}

func (d dbSink) Close() error {
return d.db.Close()
}
4 changes: 3 additions & 1 deletion runner/sidecar/sink/stan/stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package stan
import (
"context"
"fmt"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"math/rand"
"time"

corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/shared/stan"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink"
Expand Down Expand Up @@ -74,5 +75,6 @@ func (s stanSink) Sink(msg []byte) error {
}

func (s stanSink) Close() error {
logger.Info("closing stan sink connection")
return s.conn.Close()
}
5 changes: 3 additions & 2 deletions runner/sidecar/source/stan/stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"math/rand"
"net/http"
"time"

corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
sharedstan "github.com/argoproj-labs/argo-dataflow/runner/sidecar/shared/stan"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/source"
Expand Down Expand Up @@ -117,7 +118,7 @@ func (s stanSource) Close() error {
if err := s.sub.Close(); err != nil {
return err
}
logger.Info("closing stan connection")
logger.Info("closing stan source connection")
return s.conn.Close()
}

Expand Down

0 comments on commit 616753f

Please sign in to comment.