Skip to content

Commit

Permalink
chore(sidecar): Refactor sidecar (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec authored Jun 16, 2021
1 parent 9caba99 commit 0fc71c2
Show file tree
Hide file tree
Showing 16 changed files with 878 additions and 685 deletions.
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ linters:
enable:
- deadcode
- errcheck
- gofumpt
- gosimple
- govet
- misspell
Expand Down
6 changes: 3 additions & 3 deletions runner/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/runner/util"
util2 "github.com/argoproj-labs/argo-dataflow/shared/util"
sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"

"github.com/antonmedv/expr"
"github.com/google/uuid"
Expand All @@ -32,7 +32,7 @@ func withLock(dir string, f func() ([]byte, error)) ([]byte, error) {
}

func Exec(ctx context.Context, key string, endOfGroup string, groupFormat dfv1.GroupFormat) error {
if err := os.Mkdir(dfv1.PathGroups, 0o700); util2.IgnoreExist(err) != nil {
if err := os.Mkdir(dfv1.PathGroups, 0o700); sharedutil.IgnoreExist(err) != nil {
return fmt.Errorf("failed to create groups dir: %w", err)
}
prog, err := expr.Compile(key)
Expand All @@ -54,7 +54,7 @@ func Exec(ctx context.Context, key string, endOfGroup string, groupFormat dfv1.G
}
dir := filepath.Join(dfv1.PathGroups, group)
return withLock(dir, func() ([]byte, error) {
if err := os.MkdirAll(dir, 0o700); util2.IgnoreExist(err) != nil {
if err := os.MkdirAll(dir, 0o700); sharedutil.IgnoreExist(err) != nil {
return nil, fmt.Errorf("failed to create group sub-dir %q: %w", dir, err)
}
path := filepath.Join(dir, uuid.New().String())
Expand Down
17 changes: 8 additions & 9 deletions runner/init/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import (
"path/filepath"
"syscall"

util2 "github.com/argoproj-labs/argo-dataflow/shared/util"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing"
"k8s.io/utils/strings"
)

var logger = util2.NewLogger()
var logger = sharedutil.NewLogger()

// due to main container crashing, the init container may be started many times, so each operation we perform should be
// idempontent, i.e. if we copy a file to shared volume, and it already exists, we should ignore that error
Expand All @@ -29,25 +28,25 @@ func Exec() error {
return fmt.Errorf("failed to open %s: %w", a, err)
}
dst, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE, 0o500)
if util2.IgnorePermission(util2.IgnoreExist(err)) != nil {
if sharedutil.IgnorePermission(sharedutil.IgnoreExist(err)) != nil {
return fmt.Errorf("failed to open %s: %w", name, err)
} else if err == nil {
if _, err := io.Copy(dst, src); util2.IgnoreExist(err) != nil {
if _, err := io.Copy(dst, src); sharedutil.IgnoreExist(err) != nil {
return fmt.Errorf("failed to create input FIFO: %w", err)
}
}
}
step := dfv1.Step{}
util2.MustUnJSON(os.Getenv(dfv1.EnvStep), &step)
sharedutil.MustUnJSON(os.Getenv(dfv1.EnvStep), &step)

if step.Spec.GetIn().FIFO {
logger.Info("creating in fifo")
if err := syscall.Mkfifo(dfv1.PathFIFOIn, 0o600); util2.IgnoreExist(err) != nil {
if err := syscall.Mkfifo(dfv1.PathFIFOIn, 0o600); sharedutil.IgnoreExist(err) != nil {
return fmt.Errorf("failed to create input FIFO: %w", err)
}
}
logger.Info("creating out fifo")
if err := syscall.Mkfifo(dfv1.PathFIFOOut, 0o600); util2.IgnoreExist(err) != nil {
if err := syscall.Mkfifo(dfv1.PathFIFOOut, 0o600); sharedutil.IgnoreExist(err) != nil {
return fmt.Errorf("failed to create output FIFO: %w", err)
}
if g := step.Spec.Git; g != nil {
Expand All @@ -66,7 +65,7 @@ func Exec() error {
return fmt.Errorf("failed to stat %s: %w", path, err)
}
logger.Info("moving checked out code", "path", path, "wd", dfv1.PathWorkingDir)
if err := os.Rename(path, dfv1.PathWorkingDir); util2.IgnoreExist(err) != nil {
if err := os.Rename(path, dfv1.PathWorkingDir); sharedutil.IgnoreExist(err) != nil {
return fmt.Errorf("failed to moved checked out path to working dir: %w", err)
}
} else if h := step.Spec.Handler; h != nil {
Expand Down
10 changes: 4 additions & 6 deletions runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,20 @@ import (
"os/signal"
"syscall"

util2 "github.com/argoproj-labs/argo-dataflow/shared/util"

"github.com/argoproj-labs/argo-dataflow/runner/expand"
"github.com/argoproj-labs/argo-dataflow/runner/flatten"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/runner/cat"
"github.com/argoproj-labs/argo-dataflow/runner/expand"
"github.com/argoproj-labs/argo-dataflow/runner/filter"
"github.com/argoproj-labs/argo-dataflow/runner/flatten"
"github.com/argoproj-labs/argo-dataflow/runner/group"
_init "github.com/argoproj-labs/argo-dataflow/runner/init"
_map "github.com/argoproj-labs/argo-dataflow/runner/map"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar"
"github.com/argoproj-labs/argo-dataflow/runner/sleep"
sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
)

var logger = util2.NewLogger()
var logger = sharedutil.NewLogger()

func main() {
ctx := setupSignalsHandler(context.Background())
Expand Down
120 changes: 120 additions & 0 deletions runner/sidecar/in.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package sidecar

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"time"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

func connectIn(ctx context.Context, sink func([]byte) error) (func(context.Context, []byte) error, error) {
inFlight := promauto.NewGauge(prometheus.GaugeOpts{
Subsystem: "input",
Name: "inflight",
Help: "Number of in-flight messages, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#input_inflight",
ConstLabels: map[string]string{"replica": strconv.Itoa(replica)},
})
messageTimeSeconds := promauto.NewHistogram(prometheus.HistogramOpts{
Subsystem: "input",
Name: "message_time_seconds",
Help: "Message time, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#input_message_time_seconds",
ConstLabels: map[string]string{"replica": strconv.Itoa(replica)},
})
in := step.Spec.GetIn()
if in == nil {
logger.Info("no in interface configured")
return func(context.Context, []byte) error {
return fmt.Errorf("no in interface configured")
}, nil
} else if in.FIFO {
logger.Info("opened input FIFO")
fifo, err := os.OpenFile(dfv1.PathFIFOIn, os.O_WRONLY, os.ModeNamedPipe)
if err != nil {
return nil, fmt.Errorf("failed to open input FIFO: %w", err)
}
afterClosers = append(afterClosers, func(ctx context.Context) error {
logger.Info("closing FIFO")
return fifo.Close()
})
return func(ctx context.Context, data []byte) error {
inFlight.Inc()
defer inFlight.Dec()
if _, err := fifo.Write(data); err != nil {
return fmt.Errorf("failed to send to main: %w", err)
}
if _, err := fifo.Write([]byte("\n")); err != nil {
return fmt.Errorf("failed to send to main: %w", err)
}
return nil
}, nil
} else if in.HTTP != nil {
logger.Info("HTTP in interface configured")
if err := waitReady(ctx); err != nil {
return nil, err
}
afterClosers = append(afterClosers, func(ctx context.Context) error {
return waitUnready(ctx)
})
return func(ctx context.Context, data []byte) error {
inFlight.Inc()
defer inFlight.Dec()
start := time.Now()
defer func() { messageTimeSeconds.Observe(time.Since(start).Seconds()) }()
if resp, err := http.Post("http://localhost:8080/messages", "application/octet-stream", bytes.NewBuffer(data)); err != nil {
return fmt.Errorf("failed to send to main: %w", err)
} else {
body, _ := ioutil.ReadAll(resp.Body)
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode >= 300 {
return fmt.Errorf("failed to send to main: %q %q", resp.Status, body)
}
if resp.StatusCode == 201 {
return sink(body)
}
}
return nil
}, nil
} else {
return nil, fmt.Errorf("in interface misconfigured")
}
}

func waitReady(ctx context.Context) error {
logger.Info("waiting for HTTP in interface to be ready")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if resp, err := http.Get("http://localhost:8080/ready"); err == nil && resp.StatusCode < 300 {
logger.Info("HTTP in interface ready")
return nil
}
time.Sleep(3 * time.Second)
}
}
}

func waitUnready(ctx context.Context) error {
logger.Info("waiting for HTTP in interface to be unready")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if resp, err := http.Get("http://localhost:8080/ready"); err != nil || resp.StatusCode >= 300 {
logger.Info("HTTP in interface unready")
return nil
}
time.Sleep(3 * time.Second)
}
}
}
59 changes: 59 additions & 0 deletions runner/sidecar/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package sidecar

import (
"context"
"fmt"
"strings"

apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/Shopify/sarama"
dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
runnerutil "github.com/argoproj-labs/argo-dataflow/runner/util"
corev1 "k8s.io/api/core/v1"
)

func init() {
sarama.Logger = runnerutil.NewSaramaStdLogger(logger)
}

func kafkaFromSecret(k *dfv1.Kafka, secret *corev1.Secret) {
k.Brokers = dfv1.StringsOr(k.Brokers, strings.Split(string(secret.Data["brokers"]), ","))
k.Version = dfv1.StringOr(k.Version, string(secret.Data["version"]))
if _, ok := secret.Data["net.tls"]; ok {
k.NET = &dfv1.KafkaNET{TLS: &dfv1.TLS{}}
}
}

func newKafkaConfig(k *dfv1.Kafka) (*sarama.Config, error) {
x := sarama.NewConfig()
x.ClientID = dfv1.CtrSidecar
if k.Version != "" {
v, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return nil, fmt.Errorf("failed to parse kafka version %q: %w", k.Version, err)
}
x.Version = v
}
if k.NET != nil {
if k.NET.TLS != nil {
x.Net.TLS.Enable = true
}
}
return x, nil
}

func enrichKafka(ctx context.Context, secrets v1.SecretInterface, x *dfv1.Kafka) error {
secret, err := secrets.Get(ctx, "dataflow-kafka-"+x.Name, metav1.GetOptions{})
if err != nil {
if !apierr.IsNotFound(err) {
return err
}
} else {
kafkaFromSecret(x, secret)
}
return nil
}

40 changes: 40 additions & 0 deletions runner/sidecar/lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package sidecar

import (
"context"
"sync"
"time"
)

var (
preStopCh = make(chan bool, 16)
beforeClosers []func(ctx context.Context) error // should be closed before main container exits
afterClosers []func(ctx context.Context) error // should be close after the main container exits
preStopMu = sync.Mutex{}
)

func preStop() {
logger.Info("pre-stop")
preStopMu.Lock()
defer preStopMu.Unlock()
closeClosers(beforeClosers)
beforeClosers = nil
preStopCh <- true
logger.Info("pre-stop done")
}

func stop() {
closeClosers(afterClosers)
}

func closeClosers(closers []func(ctx context.Context) error) {
logger.Info("closing closers", "len", len(closers))
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
for i := len(closers) - 1; i >= 0; i-- {
logger.Info("closing", "i", i)
if err := closers[i](ctx); err != nil {
logger.Error(err, "failed to close", "i", i)
}
}
}
11 changes: 11 additions & 0 deletions runner/sidecar/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package sidecar

import "sync"

var mu = sync.Mutex{}

func withLock(f func()) {
mu.Lock()
defer mu.Unlock()
f()
}
Loading

0 comments on commit 0fc71c2

Please sign in to comment.