diff --git a/runner/sidecar/sidecar.go b/runner/sidecar/sidecar.go index 013acc05..fdb02bff 100644 --- a/runner/sidecar/sidecar.go +++ b/runner/sidecar/sidecar.go @@ -38,7 +38,7 @@ var ( logger = zap.New() debug = logger.V(6) trace = logger.V(8) - closers []func() error + closers []func(ctx context.Context) error dynamicInterface dynamic.Interface kubernetesInterface kubernetes.Interface updateInterval time.Duration @@ -62,14 +62,6 @@ func init() { } func Exec(ctx context.Context) error { - defer func() { - for i := len(closers) - 1; i >= 0; i-- { - if err := closers[i](); err != nil { - logger.Error(err, "failed to close") - } - } - }() - defer patchStepStatus(context.Background()) // always patch a final status, we use a new context in case we've been SIGTERM restConfig := ctrl.GetConfigOrDie() dynamicInterface = dynamic.NewForConfigOrDie(restConfig) @@ -78,6 +70,13 @@ func Exec(ctx context.Context) error { util2.MustUnJSON(os.Getenv(dfv1.EnvStepSpec), &spec) util2.MustUnJSON(os.Getenv(dfv1.EnvStepStatus), &status) + if status.SourceStatuses == nil { + status.SourceStatuses = dfv1.SourceStatuses{} + } + if status.SinkStatues == nil { + status.SinkStatues = dfv1.SinkStatuses{} + } + logger.Info("status", "status", util2.MustJSON(status)) lastStatus = *status.DeepCopy() @@ -99,6 +98,23 @@ func Exec(ctx context.Context) error { logger.Info("sidecar config", "stepName", spec.Name, "pipelineName", pipelineName, "replica", replica, "updateInterval", updateInterval.String()) + defer func() { + logger.Info("closing closers", "len", len(closers)) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + for i := len(closers) - 1; i >= 0; i-- { + logger.Info("closing", "i", i) + if err := closers[i](ctx); err != nil { + logger.Error(err, "failed to close", "i", i) + } + } + }() + + closers = append(closers, func(ctx context.Context) error { + patchStepStatus(ctx) + return nil + }) + toSink, err := connectSink() if err != nil { return err @@ -137,7 +153,7 @@ func patchStepStatus(ctx context.Context) { []byte(patch), metav1.PatchOptions{}, "status", - ); err != nil { + ); util2.IgnoreNotFound(err) != nil { // the step can be deleted before the pod logger.Error(err, "failed to patch step status") } lastStatus = *status.DeepCopy() @@ -226,8 +242,9 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error { cron.WithChain(cron.Recover(logger)), ) go crn.Run() - closers = append(closers, func() error { - _ = crn.Stop() + closers = append(closers, func(ctx context.Context) error { + logger.Info("stopping cron") + <-crn.Stop().Done() return nil }) sources := make(map[string]bool) @@ -244,7 +261,6 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error { f := func(msg []byte) error { debug.Info("◷ →", "source", sourceName, "msg", printable(msg)) rateCounter.Incr(1) - withLock(func() { status.SourceStatuses.Set(sourceName, replica, printable(msg), uint64(rateCounter.Rate()/int64(updateInterval/time.Second))) }) @@ -268,14 +284,20 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error { if err != nil { return fmt.Errorf("failed to connect to stan url=%s clusterID=%s clientID=%s subject=%s: %w", x.NATSURL, x.ClusterID, clientID, x.Subject, err) } - closers = append(closers, sc.Close) + closers = append(closers, func(ctx context.Context) error { + logger.Info("closing stan connection", "source", sourceName) + return sc.Close() + }) for i := 0; i < int(x.Parallel); i++ { if sub, err := sc.QueueSubscribe(x.Subject, fmt.Sprintf("%s-%s", pipelineName, spec.Name), func(m *stan.Msg) { _ = f(m.Data) }, stan.DurableName(clientID)); err != nil { return fmt.Errorf("failed to subscribe: %w", err) } else { - closers = append(closers, sub.Close) + closers = append(closers, func(ctx context.Context) error { + logger.Info("closing stan subscription", "source", sourceName) + return sub.Close() + }) if i == 0 && replica == 0 { go wait.JitterUntil(func() { if pending, _, err := sub.Pending(); err != nil { @@ -299,13 +321,19 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error { if err != nil { return err } - closers = append(closers, client.Close) + closers = append(closers, func(ctx context.Context) error { + logger.Info("closing kafka client", "source", sourceName) + return client.Close() + }) for i := 0; i < int(x.Parallel); i++ { group, err := sarama.NewConsumerGroup(x.Brokers, pipelineName+"-"+spec.Name, config) if err != nil { return fmt.Errorf("failed to create kafka consumer group: %w", err) } - closers = append(closers, group.Close) + closers = append(closers, func(ctx context.Context) error { + logger.Info("closing kafka consumer group", "source", sourceName) + return group.Close() + }) handler := &handler{f: f} go wait.JitterUntil(func() { if err := group.Consume(ctx, []string{x.Topic}, handler); err != nil { @@ -380,43 +408,38 @@ func connectTo(ctx context.Context) (func([]byte) error, error) { if err != nil { return nil, fmt.Errorf("failed to open input FIFO: %w", err) } - closers = append(closers, fifo.Close) + closers = append(closers, func(ctx context.Context) error { + logger.Info("closing FIFO") + return fifo.Close() + }) return func(data []byte) error { trace.Info("◷ source → fifo") if _, err := fifo.Write(data); err != nil { - return fmt.Errorf("failed to write message from source to main via FIFO: %w", err) + return fmt.Errorf("failed to send to main: %w", err) } if _, err := fifo.Write([]byte("\n")); err != nil { - return fmt.Errorf("failed to write new line from source to main via FIFO: %w", err) + return fmt.Errorf("ffailed to send to main: %w", err) } trace.Info("✔ source → fifo") return nil }, nil } else if in.HTTP != nil { logger.Info("HTTP in interface configured") - logger.Info("waiting for HTTP in interface to be ready") - LOOP: - for { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - if resp, err := http.Get("http://localhost:8080/ready"); err == nil && resp.StatusCode == 200 { - logger.Info("HTTP in interface ready") - break LOOP - } - time.Sleep(3 * time.Second) - } + if err := waitReady(ctx); err != nil { + return nil, err } + closers = append(closers, func(ctx context.Context) error { + return waitUnready(ctx) + }) return func(data []byte) error { trace.Info("◷ source → http") resp, err := http.Post("http://localhost:8080/messages", "application/octet-stream", bytes.NewBuffer(data)) if err != nil { - return fmt.Errorf("failed to send message to main via HTTP: %w", err) + return fmt.Errorf("failed to send to main: %w", err) } if resp.StatusCode >= 300 { body, _ := ioutil.ReadAll(resp.Body) - return fmt.Errorf("failed to send message to main via HTTP: %q %q", resp.Status, body) + return fmt.Errorf("failed to send to main: %q %q", resp.Status, body) } trace.Info("✔ source → http") return nil @@ -426,6 +449,38 @@ func connectTo(ctx context.Context) (func([]byte) error, error) { } } +func waitReady(ctx context.Context) error { + logger.Info("waiting for HTTP in interface to be ready") + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if resp, err := http.Get("http://localhost:8080/ready"); err == nil && resp.StatusCode == 200 { + logger.Info("HTTP in interface ready") + return nil + } + time.Sleep(3 * time.Second) + } + } +} + +func waitUnready(ctx context.Context) error { + logger.Info("waiting for HTTP in interface to be unready") + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if resp, err := http.Get("http://localhost:8080/ready"); err != nil || resp.StatusCode != 200 { + logger.Info("HTTP in interface unready") + return nil + } + time.Sleep(3 * time.Second) + } + } +} + func connectOut(toSink func([]byte) error) { logger.Info("FIFO out interface configured") go func() { @@ -435,7 +490,10 @@ func connectOut(toSink func([]byte) error) { if err != nil { return fmt.Errorf("failed to open output FIFO: %w", err) } - defer fifo.Close() + closers = append(closers, func(ctx context.Context) error { + logger.Info("closing out FIFO") + return fifo.Close() + }) logger.Info("opened output FIFO") scanner := bufio.NewScanner(fifo) for scanner.Scan() { @@ -474,14 +532,17 @@ func connectOut(toSink func([]byte) error) { w.WriteHeader(200) } }) + server := &http.Server{Addr: ":3569"} + closers = append(closers, func(ctx context.Context) error { + logger.Info("closing HTTP server") + return server.Shutdown(ctx) + }) go func() { - defer runtimeutil.HandleCrash() logger.Info("starting HTTP server") - err := http.ListenAndServe(":3569", nil) - if err != nil { + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Error(err, "failed to listen-and-server") - os.Exit(1) } + logger.Info("HTTP server shutdown") }() } @@ -499,7 +560,10 @@ func connectSink() (func([]byte) error, error) { if err != nil { return nil, fmt.Errorf("failed to connect to stan url=%s clusterID=%s clientID=%s subject=%s: %w", x.NATSURL, x.ClusterID, clientID, x.Subject, err) } - closers = append(closers, sc.Close) + closers = append(closers, func(ctx context.Context) error { + logger.Info("closing stan connection", "sink", sinkName) + return sc.Close() + }) sinks[sinkName] = func(msg []byte) error { return sc.Publish(x.Subject, msg) } } else if x := sink.Kafka; x != nil { config, err := newKafkaConfig(x) @@ -511,7 +575,10 @@ func connectSink() (func([]byte) error, error) { if err != nil { return nil, fmt.Errorf("failed to create kafka producer: %w", err) } - closers = append(closers, producer.Close) + closers = append(closers, func(ctx context.Context) error { + logger.Info("closing stan producer", "sink", sinkName) + return producer.Close() + }) sinks[sinkName] = func(msg []byte) error { _, _, err := producer.SendMessage(&sarama.ProducerMessage{ Topic: x.Topic, diff --git a/runner/util/do.go b/runner/util/do.go index 68c0af3e..8b2fcbef 100644 --- a/runner/util/do.go +++ b/runner/util/do.go @@ -52,14 +52,16 @@ func Do(ctx context.Context, fn func(msg []byte) ([][]byte, error)) error { w.WriteHeader(200) }) + server := &http.Server{Addr: ":8080"} + go func() { defer runtimeutil.HandleCrash() - if err := http.ListenAndServe(":8080", nil); err != nil { + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { panic(err) } }() <-ctx.Done() - return nil + return server.Shutdown(context.Background()) }