Skip to content

Commit

Permalink
refactor: refactor sources
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jul 16, 2021
1 parent 233f5b9 commit 5544da5
Show file tree
Hide file tree
Showing 13 changed files with 463 additions and 349 deletions.
35 changes: 0 additions & 35 deletions runner/sidecar/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@ package sidecar

import (
"context"
"crypto/tls"
"fmt"
"strings"
"time"

"github.com/Shopify/sarama"
dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
runnerutil "github.com/argoproj-labs/argo-dataflow/runner/util"
"github.com/segmentio/kafka-go"
corev1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -30,37 +26,6 @@ func kafkaFromSecret(k *dfv1.Kafka, secret *corev1.Secret) error {
return nil
}

func newKafkaConfig(k *dfv1.Kafka) (*sarama.Config, error) {
x := sarama.NewConfig()
x.ClientID = dfv1.CtrSidecar
if k.Version != "" {
v, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return nil, fmt.Errorf("failed to parse kafka version %q: %w", k.Version, err)
}
x.Version = v
}
if k.NET != nil {
if k.NET.TLS != nil {
x.Net.TLS.Enable = true
}
}
return x, nil
}

func newKafkaDialer(k *dfv1.Kafka) *kafka.Dialer {
var t *tls.Config
if k.NET != nil && k.NET.TLS != nil {
t = &tls.Config{}
}
return &kafka.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 20 * time.Second,
DualStack: true,
TLS: t,
}
}

func enrichKafka(ctx context.Context, secrets v1.SecretInterface, x *dfv1.Kafka) error {
secret, err := secrets.Get(ctx, "dataflow-kafka-"+x.Name, metav1.GetOptions{})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions runner/sidecar/shared/stan/stan_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stan
import (
"context"
"fmt"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/shared/util"
"github.com/nats-io/nats.go"
Expand Down
8 changes: 5 additions & 3 deletions runner/sidecar/sink/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink"
"io"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"net/http"
"time"
)

type httpSink struct {
Expand Down
4 changes: 2 additions & 2 deletions runner/sidecar/sink/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package kafka
import (
"context"
"crypto/tls"
"time"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink"
"github.com/segmentio/kafka-go"
"time"
)

type kafkaSink struct {
Expand Down Expand Up @@ -36,7 +37,6 @@ func New(x dfv1.Kafka) sink.Interface {

func (h kafkaSink) Sink(msg []byte) error {
return h.writer.WriteMessages(context.Background(), kafka.Message{Value: msg})

}

func (h kafkaSink) Close() error {
Expand Down
4 changes: 2 additions & 2 deletions runner/sidecar/sink/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package logsink

import (
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink"
"github.com/argoproj-labs/argo-dataflow/shared/util"
sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
)

var logger = util.NewLogger()
var logger = sharedutil.NewLogger()

type logSink struct{}

Expand Down
5 changes: 3 additions & 2 deletions runner/sidecar/sink/stan/stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package stan
import (
"context"
"fmt"
"math/rand"
"time"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/shared/stan"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink"
sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
"math/rand"
"time"
)

var logger = sharedutil.NewLogger()
Expand Down
5 changes: 3 additions & 2 deletions runner/sidecar/sinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package sidecar
import (
"context"
"fmt"
"io"

"github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink/http"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink/kafka"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink/log"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink/stan"
sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
"github.com/paulbellamy/ratecounter"
"io"
)

func connectSinks(ctx context.Context) (func([]byte) error, error) {
Expand Down Expand Up @@ -45,7 +46,7 @@ func connectSinks(ctx context.Context) (func([]byte) error, error) {
if closer, ok := sinks[sinkName].(io.Closer); ok {
logger.Info("adding stop hook", "sink", sinkName)
addStopHook(func(ctx context.Context) error {
logger.Info("stopping sink", "sink", sinkName)
logger.Info("closing", "sink", sinkName)
return closer.Close()
})
}
Expand Down
41 changes: 41 additions & 0 deletions runner/sidecar/source/cron/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cron

import (
"context"
"fmt"
"time"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/source"
sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
"github.com/robfig/cron/v3"
)

var logger = sharedutil.NewLogger()

type cronSource struct {
crn *cron.Cron
}

func New(x dfv1.Cron, f source.Func) (source.Interface, error) {
crn := cron.New(
cron.WithParser(cron.NewParser(cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor)),
cron.WithChain(cron.Recover(logger)),
)

go crn.Run()

_, err := crn.AddFunc(x.Schedule, func() {
msg := []byte(time.Now().Format(x.Layout))
_ = f(context.Background(), msg)
})
if err != nil {
return nil, fmt.Errorf("failed to schedule cron %q: %w", x.Schedule, err)
}
return cronSource{crn: crn}, nil
}

func (s cronSource) Close() error {
<-s.crn.Stop().Done()
return nil
}
42 changes: 42 additions & 0 deletions runner/sidecar/source/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package http

import (
"context"
"io/ioutil"
"net/http"

"github.com/argoproj-labs/argo-dataflow/runner/sidecar/source"
)

type httpSource struct {
ready bool
}

func New(sourceName string, f source.Func) source.Interface {
h := &httpSource{ready: true}
http.HandleFunc("/sources/"+sourceName, func(w http.ResponseWriter, r *http.Request) {
if !h.ready { // if we are not ready, we cannot serve requests
w.WriteHeader(503)
_, _ = w.Write([]byte("not ready"))
return
}
msg, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(400)
_, _ = w.Write([]byte(err.Error()))
return
}
if err := f(context.Background(), msg); err != nil {
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
} else {
w.WriteHeader(204)
}
})
return h
}

func (s *httpSource) Close() error {
s.ready = false
return nil
}
127 changes: 127 additions & 0 deletions runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package kafka

import (
"context"
"crypto/tls"
"fmt"
"time"

"github.com/Shopify/sarama"
dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/source"
sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
"github.com/segmentio/kafka-go"
"k8s.io/apimachinery/pkg/util/wait"
)

var logger = sharedutil.NewLogger()

type kafkaSource struct {
source dfv1.Kafka
reader *kafka.Reader
groupName string
}

func New(ctx context.Context, pipelineName, stepName, sourceName string, x dfv1.Kafka, f source.Func) (source.Interface, error) {
groupName := pipelineName + "-" + stepName + "-source-" + sourceName + "-" + x.Topic
var t *tls.Config
if x.NET != nil && x.NET.TLS != nil {
t = &tls.Config{}
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 20 * time.Second,
DualStack: true,
TLS: t,
}
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: x.Brokers,
Dialer: dialer,
GroupID: groupName,
Topic: x.Topic,
})
go wait.JitterUntil(func() {
ctx := context.Background()
for {
m, err := reader.ReadMessage(ctx)
if err != nil {
logger.Error(err, "failed to read kafka message", "source", sourceName)
} else {
_ = f(ctx, m.Value)
}
}
}, 3*time.Second, 1.2, true, ctx.Done())
return kafkaSource{
reader: reader,
source: x,
groupName: groupName,
}, nil
}

func (s kafkaSource) Close() error {
return s.reader.Close()
}

func newKafkaConfig(k dfv1.Kafka) (*sarama.Config, error) {
x := sarama.NewConfig()
x.ClientID = dfv1.CtrSidecar
if k.Version != "" {
v, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return nil, fmt.Errorf("failed to parse kafka version %q: %w", k.Version, err)
}
x.Version = v
}
if k.NET != nil {
if k.NET.TLS != nil {
x.Net.TLS.Enable = true
}
}
return x, nil
}

func (s kafkaSource) GetPending() (uint64, error) {
config, err := newKafkaConfig(s.source)
if err != nil {
return 0, err
}
adminClient, err := sarama.NewClusterAdmin(s.source.Brokers, config)
if err != nil {
return 0, err
}
defer func() {
if err := adminClient.Close(); err != nil {
logger.Error(err, "failed to close Kafka admin client")
}
}()
client, err := sarama.NewClient(s.source.Brokers, config) // I am not giving any configuration
if err != nil {
return 0, err
}
defer func() {
if err := client.Close(); err != nil {
logger.Error(err, "failed to close Kafka client")
}
}()
partitions, err := client.Partitions(s.source.Topic)
if err != nil {
return 0, fmt.Errorf("failed to get partitions: %w", err)
}
totalLags := int64(0)
rep, err := adminClient.ListConsumerGroupOffsets(s.groupName, map[string][]int32{s.source.Topic: partitions})
if err != nil {
return 0, fmt.Errorf("failed to list consumer group offsets: %w", err)
}
for _, partition := range partitions {
partitionOffset, err := client.GetOffset(s.source.Topic, partition, sarama.OffsetNewest)
if err != nil {
return 0, fmt.Errorf("failed to get topic/partition offsets partition %q: %w", partition, err)
}
block := rep.GetBlock(s.source.Topic, partition)
x := partitionOffset - block.Offset - 1
if x > 0 {
totalLags += x
}
}
return uint64(totalLags), nil
}
11 changes: 11 additions & 0 deletions runner/sidecar/source/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package source

import "context"

type Interface interface{}

type Func func(ctx context.Context, msg []byte) error

type HasPending interface {
GetPending(ctx context.Context) (uint64, error)
}
Loading

0 comments on commit 5544da5

Please sign in to comment.