diff --git a/runner/sidecar/kafka.go b/runner/sidecar/kafka.go index 4f55b946..2b5bcdae 100644 --- a/runner/sidecar/kafka.go +++ b/runner/sidecar/kafka.go @@ -2,15 +2,11 @@ package sidecar import ( "context" - "crypto/tls" - "fmt" "strings" - "time" "github.com/Shopify/sarama" dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" runnerutil "github.com/argoproj-labs/argo-dataflow/runner/util" - "github.com/segmentio/kafka-go" corev1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -30,37 +26,6 @@ func kafkaFromSecret(k *dfv1.Kafka, secret *corev1.Secret) error { return nil } -func newKafkaConfig(k *dfv1.Kafka) (*sarama.Config, error) { - x := sarama.NewConfig() - x.ClientID = dfv1.CtrSidecar - if k.Version != "" { - v, err := sarama.ParseKafkaVersion(k.Version) - if err != nil { - return nil, fmt.Errorf("failed to parse kafka version %q: %w", k.Version, err) - } - x.Version = v - } - if k.NET != nil { - if k.NET.TLS != nil { - x.Net.TLS.Enable = true - } - } - return x, nil -} - -func newKafkaDialer(k *dfv1.Kafka) *kafka.Dialer { - var t *tls.Config - if k.NET != nil && k.NET.TLS != nil { - t = &tls.Config{} - } - return &kafka.Dialer{ - Timeout: 10 * time.Second, - KeepAlive: 20 * time.Second, - DualStack: true, - TLS: t, - } -} - func enrichKafka(ctx context.Context, secrets v1.SecretInterface, x *dfv1.Kafka) error { secret, err := secrets.Get(ctx, "dataflow-kafka-"+x.Name, metav1.GetOptions{}) if err != nil { diff --git a/runner/sidecar/shared/stan/stan_conn.go b/runner/sidecar/shared/stan/stan_conn.go index 3c616fb2..50a0ce8a 100644 --- a/runner/sidecar/shared/stan/stan_conn.go +++ b/runner/sidecar/shared/stan/stan_conn.go @@ -3,6 +3,7 @@ package stan import ( "context" "fmt" + dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" "github.com/argoproj-labs/argo-dataflow/shared/util" "github.com/nats-io/nats.go" diff --git a/runner/sidecar/sink/http/http.go b/runner/sidecar/sink/http/http.go index ea72161f..a00e896d 100644 --- a/runner/sidecar/sink/http/http.go +++ b/runner/sidecar/sink/http/http.go @@ -4,13 +4,15 @@ import ( "bytes" "context" "fmt" + "io" + "net/http" + "time" + dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" "github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink" - "io" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "net/http" - "time" ) type httpSink struct { diff --git a/runner/sidecar/sink/kafka/kafka.go b/runner/sidecar/sink/kafka/kafka.go index 41a2600c..3981dc54 100644 --- a/runner/sidecar/sink/kafka/kafka.go +++ b/runner/sidecar/sink/kafka/kafka.go @@ -3,10 +3,11 @@ package kafka import ( "context" "crypto/tls" + "time" + dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" "github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink" "github.com/segmentio/kafka-go" - "time" ) type kafkaSink struct { @@ -36,7 +37,6 @@ func New(x dfv1.Kafka) sink.Interface { func (h kafkaSink) Sink(msg []byte) error { return h.writer.WriteMessages(context.Background(), kafka.Message{Value: msg}) - } func (h kafkaSink) Close() error { diff --git a/runner/sidecar/sink/log/log.go b/runner/sidecar/sink/log/log.go index 137d168b..290d06da 100644 --- a/runner/sidecar/sink/log/log.go +++ b/runner/sidecar/sink/log/log.go @@ -2,10 +2,10 @@ package logsink import ( "github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink" - "github.com/argoproj-labs/argo-dataflow/shared/util" + sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util" ) -var logger = util.NewLogger() +var logger = sharedutil.NewLogger() type logSink struct{} diff --git a/runner/sidecar/sink/stan/stan.go b/runner/sidecar/sink/stan/stan.go index 4217da48..e78f61de 100644 --- a/runner/sidecar/sink/stan/stan.go +++ b/runner/sidecar/sink/stan/stan.go @@ -3,14 +3,15 @@ package stan import ( "context" "fmt" + "math/rand" + "time" + dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" "github.com/argoproj-labs/argo-dataflow/runner/sidecar/shared/stan" "github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink" sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util" runtimeutil "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" - "math/rand" - "time" ) var logger = sharedutil.NewLogger() diff --git a/runner/sidecar/sinks.go b/runner/sidecar/sinks.go index dea919d4..2873ece0 100644 --- a/runner/sidecar/sinks.go +++ b/runner/sidecar/sinks.go @@ -3,6 +3,8 @@ package sidecar import ( "context" "fmt" + "io" + "github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink" "github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink/http" "github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink/kafka" @@ -10,7 +12,6 @@ import ( "github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink/stan" sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util" "github.com/paulbellamy/ratecounter" - "io" ) func connectSinks(ctx context.Context) (func([]byte) error, error) { @@ -45,7 +46,7 @@ func connectSinks(ctx context.Context) (func([]byte) error, error) { if closer, ok := sinks[sinkName].(io.Closer); ok { logger.Info("adding stop hook", "sink", sinkName) addStopHook(func(ctx context.Context) error { - logger.Info("stopping sink", "sink", sinkName) + logger.Info("closing", "sink", sinkName) return closer.Close() }) } diff --git a/runner/sidecar/source/cron/cron.go b/runner/sidecar/source/cron/cron.go new file mode 100644 index 00000000..78e3750c --- /dev/null +++ b/runner/sidecar/source/cron/cron.go @@ -0,0 +1,41 @@ +package cron + +import ( + "context" + "fmt" + "time" + + dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" + "github.com/argoproj-labs/argo-dataflow/runner/sidecar/source" + sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util" + "github.com/robfig/cron/v3" +) + +var logger = sharedutil.NewLogger() + +type cronSource struct { + crn *cron.Cron +} + +func New(x dfv1.Cron, f source.Func) (source.Interface, error) { + crn := cron.New( + cron.WithParser(cron.NewParser(cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor)), + cron.WithChain(cron.Recover(logger)), + ) + + go crn.Run() + + _, err := crn.AddFunc(x.Schedule, func() { + msg := []byte(time.Now().Format(x.Layout)) + _ = f(context.Background(), msg) + }) + if err != nil { + return nil, fmt.Errorf("failed to schedule cron %q: %w", x.Schedule, err) + } + return cronSource{crn: crn}, nil +} + +func (s cronSource) Close() error { + <-s.crn.Stop().Done() + return nil +} diff --git a/runner/sidecar/source/http/http.go b/runner/sidecar/source/http/http.go new file mode 100644 index 00000000..6c97258f --- /dev/null +++ b/runner/sidecar/source/http/http.go @@ -0,0 +1,42 @@ +package http + +import ( + "context" + "io/ioutil" + "net/http" + + "github.com/argoproj-labs/argo-dataflow/runner/sidecar/source" +) + +type httpSource struct { + ready bool +} + +func New(sourceName string, f source.Func) source.Interface { + h := &httpSource{ready: true} + http.HandleFunc("/sources/"+sourceName, func(w http.ResponseWriter, r *http.Request) { + if !h.ready { // if we are not ready, we cannot serve requests + w.WriteHeader(503) + _, _ = w.Write([]byte("not ready")) + return + } + msg, err := ioutil.ReadAll(r.Body) + if err != nil { + w.WriteHeader(400) + _, _ = w.Write([]byte(err.Error())) + return + } + if err := f(context.Background(), msg); err != nil { + w.WriteHeader(500) + _, _ = w.Write([]byte(err.Error())) + } else { + w.WriteHeader(204) + } + }) + return h +} + +func (s *httpSource) Close() error { + s.ready = false + return nil +} diff --git a/runner/sidecar/source/kafka/kafka.go b/runner/sidecar/source/kafka/kafka.go new file mode 100644 index 00000000..2f257d6f --- /dev/null +++ b/runner/sidecar/source/kafka/kafka.go @@ -0,0 +1,127 @@ +package kafka + +import ( + "context" + "crypto/tls" + "fmt" + "time" + + "github.com/Shopify/sarama" + dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" + "github.com/argoproj-labs/argo-dataflow/runner/sidecar/source" + sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util" + "github.com/segmentio/kafka-go" + "k8s.io/apimachinery/pkg/util/wait" +) + +var logger = sharedutil.NewLogger() + +type kafkaSource struct { + source dfv1.Kafka + reader *kafka.Reader + groupName string +} + +func New(ctx context.Context, pipelineName, stepName, sourceName string, x dfv1.Kafka, f source.Func) (source.Interface, error) { + groupName := pipelineName + "-" + stepName + "-source-" + sourceName + "-" + x.Topic + var t *tls.Config + if x.NET != nil && x.NET.TLS != nil { + t = &tls.Config{} + } + dialer := &kafka.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 20 * time.Second, + DualStack: true, + TLS: t, + } + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: x.Brokers, + Dialer: dialer, + GroupID: groupName, + Topic: x.Topic, + }) + go wait.JitterUntil(func() { + ctx := context.Background() + for { + m, err := reader.ReadMessage(ctx) + if err != nil { + logger.Error(err, "failed to read kafka message", "source", sourceName) + } else { + _ = f(ctx, m.Value) + } + } + }, 3*time.Second, 1.2, true, ctx.Done()) + return kafkaSource{ + reader: reader, + source: x, + groupName: groupName, + }, nil +} + +func (s kafkaSource) Close() error { + return s.reader.Close() +} + +func newKafkaConfig(k dfv1.Kafka) (*sarama.Config, error) { + x := sarama.NewConfig() + x.ClientID = dfv1.CtrSidecar + if k.Version != "" { + v, err := sarama.ParseKafkaVersion(k.Version) + if err != nil { + return nil, fmt.Errorf("failed to parse kafka version %q: %w", k.Version, err) + } + x.Version = v + } + if k.NET != nil { + if k.NET.TLS != nil { + x.Net.TLS.Enable = true + } + } + return x, nil +} + +func (s kafkaSource) GetPending() (uint64, error) { + config, err := newKafkaConfig(s.source) + if err != nil { + return 0, err + } + adminClient, err := sarama.NewClusterAdmin(s.source.Brokers, config) + if err != nil { + return 0, err + } + defer func() { + if err := adminClient.Close(); err != nil { + logger.Error(err, "failed to close Kafka admin client") + } + }() + client, err := sarama.NewClient(s.source.Brokers, config) // I am not giving any configuration + if err != nil { + return 0, err + } + defer func() { + if err := client.Close(); err != nil { + logger.Error(err, "failed to close Kafka client") + } + }() + partitions, err := client.Partitions(s.source.Topic) + if err != nil { + return 0, fmt.Errorf("failed to get partitions: %w", err) + } + totalLags := int64(0) + rep, err := adminClient.ListConsumerGroupOffsets(s.groupName, map[string][]int32{s.source.Topic: partitions}) + if err != nil { + return 0, fmt.Errorf("failed to list consumer group offsets: %w", err) + } + for _, partition := range partitions { + partitionOffset, err := client.GetOffset(s.source.Topic, partition, sarama.OffsetNewest) + if err != nil { + return 0, fmt.Errorf("failed to get topic/partition offsets partition %q: %w", partition, err) + } + block := rep.GetBlock(s.source.Topic, partition) + x := partitionOffset - block.Offset - 1 + if x > 0 { + totalLags += x + } + } + return uint64(totalLags), nil +} diff --git a/runner/sidecar/source/source.go b/runner/sidecar/source/source.go new file mode 100644 index 00000000..c4bfabd6 --- /dev/null +++ b/runner/sidecar/source/source.go @@ -0,0 +1,11 @@ +package source + +import "context" + +type Interface interface{} + +type Func func(ctx context.Context, msg []byte) error + +type HasPending interface { + GetPending(ctx context.Context) (uint64, error) +} diff --git a/runner/sidecar/source/stan/stan.go b/runner/sidecar/source/stan/stan.go new file mode 100644 index 00000000..cd732a2f --- /dev/null +++ b/runner/sidecar/source/stan/stan.go @@ -0,0 +1,182 @@ +package stan + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "net/http" + "time" + + dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" + sharedstan "github.com/argoproj-labs/argo-dataflow/runner/sidecar/shared/stan" + "github.com/argoproj-labs/argo-dataflow/runner/sidecar/source" + sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util" + "github.com/nats-io/stan.go" + "github.com/nats-io/stan.go/pb" + runtimeutil "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes" +) + +var logger = sharedutil.NewLogger() + +type stanSource struct { + sub stan.Subscription + conn *sharedstan.Conn + subject string + natsMonitoringURL string + queueName string +} + +func New(ctx context.Context, kubernetesInterface kubernetes.Interface, namespace, pipelineName, stepName string, replica int, sourceName string, x dfv1.STAN, f source.Func) (source.Interface, error) { + genClientID := func() string { + // In a particular situation, the stan connection status is inconsistent between stan server and client, + // the connection is lost from client side, but the server still thinks it's alive. In this case, use + // the same client ID to reconnect will fail. To avoid that, add a random number in the client ID string. + s1 := rand.NewSource(time.Now().UnixNano()) + r1 := rand.New(s1) + return fmt.Sprintf("%s-%s-%d-source-%s-%v", pipelineName, stepName, replica, sourceName, r1.Intn(100)) + } + + var conn *sharedstan.Conn + var err error + clientID := genClientID() + conn, err = sharedstan.ConnectSTAN(ctx, kubernetesInterface, namespace, x, clientID) + if err != nil { + return nil, err + } + + // https://docs.nats.io/developing-with-nats-streaming/queues + var sub stan.Subscription + queueName := fmt.Sprintf("%s-%s-source-%s", pipelineName, stepName, sourceName) + subFunc := func() (stan.Subscription, error) { + sub, err := conn.QueueSubscribe(x.Subject, queueName, func(msg *stan.Msg) { + if err := f(context.Background(), msg.Data); err != nil { + // noop + } else if err := msg.Ack(); err != nil { + logger.Error(err, "failed to ack message", "source", sourceName) + } + }, stan.DurableName(queueName), + stan.SetManualAckMode(), + stan.StartAt(pb.StartPosition_NewOnly), + stan.AckWait(30*time.Second), + stan.MaxInflight(x.GetMaxInflight())) + if err != nil { + return nil, fmt.Errorf("failed to subscribe: %w", err) + } + return sub, nil + } + + sub, err = subFunc() + if err != nil { + return nil, err + } + go func() { + defer runtimeutil.HandleCrash() + logger.Info("starting stan auto reconnection daemon", "source", sourceName) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + logger.Info("exiting stan auto reconnection daemon", "source", sourceName) + return + case <-ticker.C: + if conn == nil || conn.IsClosed() { + _ = sub.Close() + logger.Info("stan connection lost, reconnecting...", "source", sourceName) + clientID := genClientID() + conn, err = sharedstan.ConnectSTAN(ctx, kubernetesInterface, namespace, x, clientID) + if err != nil { + logger.Error(err, "failed to reconnect", "source", sourceName, "clientID", clientID) + continue + } + logger.Info("reconnected to stan server.", "source", sourceName, "clientID", clientID) + if sub, err = subFunc(); err != nil { + logger.Error(err, "failed to subscribe after reconnection", "source", sourceName, "clientID", clientID) + // Close the connection to let it retry + _ = conn.Close() + } + } + } + } + }() + + return stanSource{ + conn: conn, + sub: sub, + subject: x.Subject, + natsMonitoringURL: x.NATSMonitoringURL, + queueName: queueName, + }, nil +} + +func (s stanSource) Close() error { + logger.Info("closing stan subscription") + if err := s.sub.Close(); err != nil { + return err + } + logger.Info("closing stan connection") + return s.conn.Close() +} + +func (s stanSource) GetPending(ctx context.Context) (uint64, error) { + httpClient := http.Client{ + Timeout: time.Second * 3, + } + + type obj = map[string]interface{} + + pendingMessages := func(ctx context.Context, channel, queueNameCombo string) (int64, error) { + monitoringEndpoint := fmt.Sprintf("%s/streaming/channelsz?channel=%s&subs=1", s.natsMonitoringURL, channel) + req, err := http.NewRequestWithContext(ctx, "GET", monitoringEndpoint, nil) + if err != nil { + return 0, err + } + resp, err := httpClient.Do(req) + if err != nil { + return 0, err + } + if resp.StatusCode != 200 { + return 0, fmt.Errorf("invalid response: %s", resp.Status) + } + defer func() { _ = resp.Body.Close() }() + o := make(obj) + if err := json.NewDecoder(resp.Body).Decode(&o); err != nil { + return 0, err + } + lastSeq, ok := o["last_seq"].(float64) + if !ok { + return 0, fmt.Errorf("unrecognized last_seq: %v", o["last_seq"]) + } + subs, ok := o["subscriptions"] + if !ok { + return 0, fmt.Errorf("no suscriptions field found in the monitoring endpoint response") + } + maxLastSent := float64(0) + for _, i := range subs.([]interface{}) { + s := i.(obj) + if fmt.Sprintf("%v", s["queue_name"]) != queueNameCombo { + continue + } + lastSent, ok := s["last_sent"].(float64) + if !ok { + return 0, fmt.Errorf("unrecognized last_sent: %v", s["last_sent"]) + } + if lastSent > maxLastSent { + maxLastSent = lastSent + } + } + return int64(lastSeq) - int64(maxLastSent), nil + } + + // queueNameCombo := {durableName}:{queueGroup} + queueNameCombo := s.queueName + ":" + s.queueName + if pending, err := pendingMessages(ctx, s.subject, queueNameCombo); err != nil { + return 0, fmt.Errorf("failed to get STAN pending for: %w", err) + } else if pending >= 0 { + logger.Info("setting STAN pending", "pending", pending) + return uint64(pending), nil + } + return 0, nil +} diff --git a/runner/sidecar/sources.go b/runner/sidecar/sources.go index 01aaf236..4eb48bae 100644 --- a/runner/sidecar/sources.go +++ b/runner/sidecar/sources.go @@ -2,68 +2,50 @@ package sidecar import ( "context" - "encoding/json" "fmt" - "io/ioutil" - "math/rand" - "net/http" + "io" "time" - "github.com/segmentio/kafka-go" + "github.com/argoproj-labs/argo-dataflow/runner/sidecar/source" + "github.com/argoproj-labs/argo-dataflow/runner/sidecar/source/cron" + httpsource "github.com/argoproj-labs/argo-dataflow/runner/sidecar/source/http" + kafkasource "github.com/argoproj-labs/argo-dataflow/runner/sidecar/source/kafka" + "github.com/argoproj-labs/argo-dataflow/runner/sidecar/source/stan" - "github.com/Shopify/sarama" dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" - sharedstan "github.com/argoproj-labs/argo-dataflow/runner/sidecar/shared/stan" sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util" - "github.com/nats-io/stan.go" - "github.com/nats-io/stan.go/pb" "github.com/paulbellamy/ratecounter" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/robfig/cron/v3" - runtimeutil "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" -) - -var crn = cron.New( - cron.WithParser(cron.NewParser(cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor)), - cron.WithChain(cron.Recover(logger)), ) func connectSources(ctx context.Context, toMain func(context.Context, []byte) error) error { - go crn.Run() - addPreStopHook(func(ctx context.Context) error { - logger.Info("stopping cron") - <-crn.Stop().Done() - return nil - }) - sources := make(map[string]bool) - for _, source := range step.Spec.Sources { - logger.Info("connecting source", "source", sharedutil.MustJSON(source)) - sourceName := source.Name + sources := make(map[string]source.Interface) + for _, s := range step.Spec.Sources { + logger.Info("connecting source", "source", sharedutil.MustJSON(s)) + sourceName := s.Name if _, exists := sources[sourceName]; exists { return fmt.Errorf("duplicate source named %q", sourceName) } - sources[sourceName] = true if leadReplica() { // only replica zero updates this value, so it the only replica that can be accurate - newSourceMetrics(source, sourceName) + newSourceMetrics(s, sourceName) } rateCounter := ratecounter.NewRateCounter(updateInterval) - logger.Info("retry config", "source", sourceName, "backoff", source.Retry) + logger.Info("retry config", "source", sourceName, "backoff", s.Retry) f := func(ctx context.Context, msg []byte) error { rateCounter.Incr(1) withLock(func() { step.Status.SourceStatuses.IncrTotal(sourceName, replica, printable(msg), rateToResourceQuantity(rateCounter)) }) - backoff := newBackoff(source.Retry) + backoff := newBackoff(s.Retry) for { select { case <-ctx.Done(): return fmt.Errorf("could not send message: %w", ctx.Err()) default: - if uint64(backoff.Steps) < source.Retry.Steps { // this is a retry + if uint64(backoff.Steps) < s.Retry.Steps { // this is a retry logger.Info("retry", "source", sourceName, "backoff", backoff) withLock(func() { step.Status.SourceStatuses.IncrRetries(sourceName, replica) }) } @@ -80,289 +62,48 @@ func connectSources(ctx context.Context, toMain func(context.Context, []byte) er } } } - if x := source.Cron; x != nil { - if err := connectCronSource(x, f); err != nil { + if x := s.Cron; x != nil { + if y, err := cron.New(*x, f); err != nil { return err + } else { + sources[sourceName] = y } - } else if x := source.STAN; x != nil { - if err := connectSTANSource(ctx, sourceName, x, f); err != nil { + } else if x := s.STAN; x != nil { + if y, err := stan.New(ctx, kubernetesInterface, namespace, pipelineName, stepName, replica, sourceName, *x, f); err != nil { return err + } else { + sources[sourceName] = y } - } else if x := source.Kafka; x != nil { - if err := connectKafkaSource(ctx, x, sourceName, f); err != nil { + } else if x := s.Kafka; x != nil { + if y, err := kafkasource.New(ctx, pipelineName, stepName, sourceName, *x, f); err != nil { return err + } else { + sources[sourceName] = y } - } else if x := source.HTTP; x != nil { - connectHTTPSource(sourceName, f) + } else if x := s.HTTP; x != nil { + sources[sourceName] = httpsource.New(sourceName, f) } else { return fmt.Errorf("source misconfigured") } - } - return nil -} - -func connectHTTPSource(sourceName string, f func(ctx context.Context, msg []byte) error) { - http.HandleFunc("/sources/"+sourceName, func(w http.ResponseWriter, r *http.Request) { - if !ready { // if we are not ready, we cannot serve requests - w.WriteHeader(503) - _, _ = w.Write([]byte("not ready")) - return - } - msg, err := ioutil.ReadAll(r.Body) - if err != nil { - w.WriteHeader(400) - _, _ = w.Write([]byte(err.Error())) - return - } - if err := f(context.Background(), msg); err != nil { - w.WriteHeader(500) - _, _ = w.Write([]byte(err.Error())) - } else { - w.WriteHeader(204) - } - }) -} - -func connectKafkaSource(ctx context.Context, x *dfv1.Kafka, sourceName string, f func(ctx context.Context, msg []byte) error) error { - groupName := pipelineName + "-" + stepName + "-source-" + sourceName + "-" + x.Topic - reader := kafka.NewReader(kafka.ReaderConfig{ - Brokers: x.Brokers, - Dialer: newKafkaDialer(x), - GroupID: groupName, - Topic: x.Topic, - }) - addPreStopHook(func(ctx context.Context) error { - logger.Info("closing kafka reader", "source", sourceName) - return reader.Close() - }) - go wait.JitterUntil(func() { - ctx := context.Background() - for { - m, err := reader.ReadMessage(ctx) - if err != nil { - logger.Error(err, "failed to read kafka message", "source", sourceName) - } else { - _ = f(ctx, m.Value) - } - } - }, 3*time.Second, 1.2, true, ctx.Done()) - if leadReplica() { - if err := registerKafkaSetPendingHook(x, sourceName, groupName); err != nil { - return err - } - } - return nil -} - -func registerKafkaSetPendingHook(x *dfv1.Kafka, sourceName string, groupName string) error { - config, err := newKafkaConfig(x) - if err != nil { - return err - } - prePatchHooks = append(prePatchHooks, func(ctx context.Context) error { - adminClient, err := sarama.NewClusterAdmin(x.Brokers, config) - if err != nil { - return err - } - defer func() { - if err := adminClient.Close(); err != nil { - logger.Error(err, "failed to close Kafka admin client", "source", sourceName) - } - }() - client, err := sarama.NewClient(x.Brokers, config) // I am not giving any configuration - if err != nil { - return err - } - defer func() { - if err := client.Close(); err != nil { - logger.Error(err, "failed to close Kafka client", "source", sourceName) - } - }() - partitions, err := client.Partitions(x.Topic) - if err != nil { - return fmt.Errorf("failed to get partitions for %q: %w", sourceName, err) - } - totalLags := int64(0) - rep, err := adminClient.ListConsumerGroupOffsets(groupName, map[string][]int32{x.Topic: partitions}) - if err != nil { - return fmt.Errorf("failed to list consumer group offsets for %q: %w", sourceName, err) - } - for _, partition := range partitions { - partitionOffset, err := client.GetOffset(x.Topic, partition, sarama.OffsetNewest) - if err != nil { - return fmt.Errorf("failed to get topic/partition offsets for %q partition %q: %w", sourceName, partition, err) - } - block := rep.GetBlock(x.Topic, partition) - x := partitionOffset - block.Offset - 1 - if x > 0 { - totalLags += x - } - } - logger.Info("setting pending", "source", sourceName, "pending", totalLags) - withLock(func() { step.Status.SourceStatuses.SetPending(sourceName, uint64(totalLags)) }) - return nil - }) - return nil -} - -func connectSTANSource(ctx context.Context, sourceName string, x *dfv1.STAN, f func(ctx context.Context, msg []byte) error) error { - genClientID := func() string { - // In a particular situation, the stan connection status is inconsistent between stan server and client, - // the connection is lost from client side, but the server still thinks it's alive. In this case, use - // the same client ID to reconnect will fail. To avoid that, add a random number in the client ID string. - s1 := rand.NewSource(time.Now().UnixNano()) - r1 := rand.New(s1) - return fmt.Sprintf("%s-%s-%d-source-%s-%v", pipelineName, stepName, replica, sourceName, r1.Intn(100)) - } - - var conn *sharedstan.Conn - var err error - clientID := genClientID() - conn, err = sharedstan.ConnectSTAN(ctx, kubernetesInterface, namespace, *x, clientID) - if err != nil { - return fmt.Errorf("failed to connect to stan url=%s clusterID=%s clientID=%s subject=%s: %w", x.NATSURL, x.ClusterID, clientID, x.Subject, err) - } - addPreStopHook(func(ctx context.Context) error { - logger.Info("closing stan connection", "source", sourceName) - return conn.Close() - }) - - // https://docs.nats.io/developing-with-nats-streaming/queues - var sub stan.Subscription - queueName := fmt.Sprintf("%s-%s-source-%s", pipelineName, stepName, sourceName) - subFunc := func() (stan.Subscription, error) { - sub, err := conn.QueueSubscribe(x.Subject, queueName, func(msg *stan.Msg) { - if err := f(context.Background(), msg.Data); err != nil { - // noop - } else if err := msg.Ack(); err != nil { - logger.Error(err, "failed to ack message", "source", sourceName) - } - }, stan.DurableName(queueName), - stan.SetManualAckMode(), - stan.StartAt(pb.StartPosition_NewOnly), - stan.AckWait(30*time.Second), - stan.MaxInflight(x.GetMaxInflight())) - if err != nil { - return nil, fmt.Errorf("failed to subscribe: %w", err) + if x, ok := sources[sourceName].(io.Closer); ok { + logger.Info("adding pre-stop hook", "source", sourceName) + addStopHook(func(ctx context.Context) error { + logger.Info("closing", "source", sourceName) + return x.Close() + }) } - return sub, nil - } - - sub, err = subFunc() - if err != nil { - return err - } - addPreStopHook(func(ctx context.Context) error { - logger.Info("closing stan subscription", "source", sourceName) - return sub.Close() - }) - if leadReplica() { - registerSTANSetPendingHook(sourceName, x, queueName) - } - - go func() { - defer runtimeutil.HandleCrash() - logger.Info("starting stan auto reconnection daemon", "source", sourceName) - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - logger.Info("exiting stan auto reconnection daemon", "source", sourceName) - return - case <-ticker.C: - if conn == nil || conn.IsClosed() { - _ = sub.Close() - logger.Info("stan connection lost, reconnecting...", "source", sourceName) - clientID := genClientID() - conn, err = sharedstan.ConnectSTAN(ctx, kubernetesInterface, namespace, *x, clientID) - if err != nil { - logger.Error(err, "failed to reconnect", "source", sourceName, "clientID", clientID) - continue - } - logger.Info("reconnected to stan server.", "source", sourceName, "clientID", clientID) - if sub, err = subFunc(); err != nil { - logger.Error(err, "failed to subscribe after reconnection", "source", sourceName, "clientID", clientID) - // Close the connection to let it retry - _ = conn.Close() - } + if x, ok := sources[sourceName].(source.HasPending); ok && leadReplica() { + logger.Info("adding pre-patch hook", "source", sourceName) + prePatchHooks = append(prePatchHooks, func(ctx context.Context) error { + logger.Info("getting pending", "source", sourceName) + if pending, err := x.GetPending(ctx); err != nil { + return err + } else { + withLock(func() { step.Status.SourceStatuses.SetPending(sourceName, pending) }) } - } - } - }() - - return nil -} - -func registerSTANSetPendingHook(sourceName string, x *dfv1.STAN, queueName string) { - httpClient := http.Client{ - Timeout: time.Second * 3, - } - - type obj = map[string]interface{} - - pendingMessages := func(ctx context.Context, channel, queueNameCombo string) (int64, error) { - monitoringEndpoint := fmt.Sprintf("%s/streaming/channelsz?channel=%s&subs=1", x.NATSMonitoringURL, channel) - req, err := http.NewRequestWithContext(ctx, "GET", monitoringEndpoint, nil) - if err != nil { - return 0, err - } - resp, err := httpClient.Do(req) - if err != nil { - return 0, err - } - if resp.StatusCode != 200 { - return 0, fmt.Errorf("invalid response: %s", resp.Status) - } - defer func() { _ = resp.Body.Close() }() - o := make(obj) - if err := json.NewDecoder(resp.Body).Decode(&o); err != nil { - return 0, err - } - lastSeq, ok := o["last_seq"].(float64) - if !ok { - return 0, fmt.Errorf("unrecognized last_seq: %v", o["last_seq"]) - } - subs, ok := o["subscriptions"] - if !ok { - return 0, fmt.Errorf("no suscriptions field found in the monitoring endpoint response") - } - maxLastSent := float64(0) - for _, i := range subs.([]interface{}) { - s := i.(obj) - if fmt.Sprintf("%v", s["queue_name"]) != queueNameCombo { - continue - } - lastSent, ok := s["last_sent"].(float64) - if !ok { - return 0, fmt.Errorf("unrecognized last_sent: %v", s["last_sent"]) - } - if lastSent > maxLastSent { - maxLastSent = lastSent - } - } - return int64(lastSeq) - int64(maxLastSent), nil - } - prePatchHooks = append(prePatchHooks, func(ctx context.Context) error { - // queueNameCombo := {durableName}:{queueGroup} - queueNameCombo := queueName + ":" + queueName - if pending, err := pendingMessages(ctx, x.Subject, queueNameCombo); err != nil { - return fmt.Errorf("failed to get pending for %q: %w", sourceName, err) - } else if pending >= 0 { - logger.Info("setting pending", "source", sourceName, "pending", pending) - withLock(func() { step.Status.SourceStatuses.SetPending(sourceName, uint64(pending)) }) + return nil + }) } - return nil - }) -} - -func connectCronSource(x *dfv1.Cron, f func(ctx context.Context, msg []byte) error) error { - _, err := crn.AddFunc(x.Schedule, func() { - msg := []byte(time.Now().Format(x.Layout)) - _ = f(context.Background(), msg) - }) - if err != nil { - return fmt.Errorf("failed to schedule cron %q: %w", x.Schedule, err) } return nil }