Skip to content

Commit

Permalink
fix: enhanced shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 21, 2021
1 parent 6eebb68 commit de8a19f
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 44 deletions.
151 changes: 109 additions & 42 deletions runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (
logger = zap.New()
debug = logger.V(6)
trace = logger.V(8)
closers []func() error
closers []func(ctx context.Context) error
dynamicInterface dynamic.Interface
kubernetesInterface kubernetes.Interface
updateInterval time.Duration
Expand All @@ -62,14 +62,6 @@ func init() {
}

func Exec(ctx context.Context) error {
defer func() {
for i := len(closers) - 1; i >= 0; i-- {
if err := closers[i](); err != nil {
logger.Error(err, "failed to close")
}
}
}()
defer patchStepStatus(context.Background()) // always patch a final status, we use a new context in case we've been SIGTERM

restConfig := ctrl.GetConfigOrDie()
dynamicInterface = dynamic.NewForConfigOrDie(restConfig)
Expand All @@ -78,6 +70,13 @@ func Exec(ctx context.Context) error {
util2.MustUnJSON(os.Getenv(dfv1.EnvStepSpec), &spec)
util2.MustUnJSON(os.Getenv(dfv1.EnvStepStatus), &status)

if status.SourceStatuses == nil {
status.SourceStatuses = dfv1.SourceStatuses{}
}
if status.SinkStatues == nil {
status.SinkStatues = dfv1.SinkStatuses{}
}

logger.Info("status", "status", util2.MustJSON(status))

lastStatus = *status.DeepCopy()
Expand All @@ -99,6 +98,23 @@ func Exec(ctx context.Context) error {

logger.Info("sidecar config", "stepName", spec.Name, "pipelineName", pipelineName, "replica", replica, "updateInterval", updateInterval.String())

defer func() {
logger.Info("closing closers", "len", len(closers))
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
for i := len(closers) - 1; i >= 0; i-- {
logger.Info("closing", "i", i)
if err := closers[i](ctx); err != nil {
logger.Error(err, "failed to close", "i", i)
}
}
}()

closers = append(closers, func(ctx context.Context) error {
patchStepStatus(ctx)
return nil
})

toSink, err := connectSink()
if err != nil {
return err
Expand Down Expand Up @@ -137,7 +153,7 @@ func patchStepStatus(ctx context.Context) {
[]byte(patch),
metav1.PatchOptions{},
"status",
); err != nil {
); util2.IgnoreNotFound(err) != nil { // the step can be deleted before the pod
logger.Error(err, "failed to patch step status")
}
lastStatus = *status.DeepCopy()
Expand Down Expand Up @@ -226,8 +242,9 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
cron.WithChain(cron.Recover(logger)),
)
go crn.Run()
closers = append(closers, func() error {
_ = crn.Stop()
closers = append(closers, func(ctx context.Context) error {
logger.Info("stopping cron")
<-crn.Stop().Done()
return nil
})
sources := make(map[string]bool)
Expand All @@ -244,7 +261,6 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
f := func(msg []byte) error {
debug.Info("◷ →", "source", sourceName, "msg", printable(msg))
rateCounter.Incr(1)

withLock(func() {
status.SourceStatuses.Set(sourceName, replica, printable(msg), uint64(rateCounter.Rate()/int64(updateInterval/time.Second)))
})
Expand All @@ -268,14 +284,20 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
if err != nil {
return fmt.Errorf("failed to connect to stan url=%s clusterID=%s clientID=%s subject=%s: %w", x.NATSURL, x.ClusterID, clientID, x.Subject, err)
}
closers = append(closers, sc.Close)
closers = append(closers, func(ctx context.Context) error {
logger.Info("closing stan connection", "source", sourceName)
return sc.Close()
})
for i := 0; i < int(x.Parallel); i++ {
if sub, err := sc.QueueSubscribe(x.Subject, fmt.Sprintf("%s-%s", pipelineName, spec.Name), func(m *stan.Msg) {
_ = f(m.Data)
}, stan.DurableName(clientID)); err != nil {
return fmt.Errorf("failed to subscribe: %w", err)
} else {
closers = append(closers, sub.Close)
closers = append(closers, func(ctx context.Context) error {
logger.Info("closing stan subscription", "source", sourceName)
return sub.Close()
})
if i == 0 && replica == 0 {
go wait.JitterUntil(func() {
if pending, _, err := sub.Pending(); err != nil {
Expand All @@ -299,13 +321,19 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
if err != nil {
return err
}
closers = append(closers, client.Close)
closers = append(closers, func(ctx context.Context) error {
logger.Info("closing kafka client", "source", sourceName)
return client.Close()
})
for i := 0; i < int(x.Parallel); i++ {
group, err := sarama.NewConsumerGroup(x.Brokers, pipelineName+"-"+spec.Name, config)
if err != nil {
return fmt.Errorf("failed to create kafka consumer group: %w", err)
}
closers = append(closers, group.Close)
closers = append(closers, func(ctx context.Context) error {
logger.Info("closing kafka consumer group", "source", sourceName)
return group.Close()
})
handler := &handler{f: f}
go wait.JitterUntil(func() {
if err := group.Consume(ctx, []string{x.Topic}, handler); err != nil {
Expand Down Expand Up @@ -380,43 +408,38 @@ func connectTo(ctx context.Context) (func([]byte) error, error) {
if err != nil {
return nil, fmt.Errorf("failed to open input FIFO: %w", err)
}
closers = append(closers, fifo.Close)
closers = append(closers, func(ctx context.Context) error {
logger.Info("closing FIFO")
return fifo.Close()
})
return func(data []byte) error {
trace.Info("◷ source → fifo")
if _, err := fifo.Write(data); err != nil {
return fmt.Errorf("failed to write message from source to main via FIFO: %w", err)
return fmt.Errorf("failed to send to main: %w", err)
}
if _, err := fifo.Write([]byte("\n")); err != nil {
return fmt.Errorf("failed to write new line from source to main via FIFO: %w", err)
return fmt.Errorf("ffailed to send to main: %w", err)
}
trace.Info("✔ source → fifo")
return nil
}, nil
} else if in.HTTP != nil {
logger.Info("HTTP in interface configured")
logger.Info("waiting for HTTP in interface to be ready")
LOOP:
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
if resp, err := http.Get("http://localhost:8080/ready"); err == nil && resp.StatusCode == 200 {
logger.Info("HTTP in interface ready")
break LOOP
}
time.Sleep(3 * time.Second)
}
if err := waitReady(ctx); err != nil {
return nil, err
}
closers = append(closers, func(ctx context.Context) error {
return waitUnready(ctx)
})
return func(data []byte) error {
trace.Info("◷ source → http")
resp, err := http.Post("http://localhost:8080/messages", "application/octet-stream", bytes.NewBuffer(data))
if err != nil {
return fmt.Errorf("failed to send message to main via HTTP: %w", err)
return fmt.Errorf("failed to send to main: %w", err)
}
if resp.StatusCode >= 300 {
body, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("failed to send message to main via HTTP: %q %q", resp.Status, body)
return fmt.Errorf("failed to send to main: %q %q", resp.Status, body)
}
trace.Info("✔ source → http")
return nil
Expand All @@ -426,6 +449,38 @@ func connectTo(ctx context.Context) (func([]byte) error, error) {
}
}

func waitReady(ctx context.Context) error {
logger.Info("waiting for HTTP in interface to be ready")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if resp, err := http.Get("http://localhost:8080/ready"); err == nil && resp.StatusCode == 200 {
logger.Info("HTTP in interface ready")
return nil
}
time.Sleep(3 * time.Second)
}
}
}

func waitUnready(ctx context.Context) error {
logger.Info("waiting for HTTP in interface to be unready")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if resp, err := http.Get("http://localhost:8080/ready"); err != nil || resp.StatusCode != 200 {
logger.Info("HTTP in interface unready")
return nil
}
time.Sleep(3 * time.Second)
}
}
}

func connectOut(toSink func([]byte) error) {
logger.Info("FIFO out interface configured")
go func() {
Expand All @@ -435,7 +490,10 @@ func connectOut(toSink func([]byte) error) {
if err != nil {
return fmt.Errorf("failed to open output FIFO: %w", err)
}
defer fifo.Close()
closers = append(closers, func(ctx context.Context) error {
logger.Info("closing out FIFO")
return fifo.Close()
})
logger.Info("opened output FIFO")
scanner := bufio.NewScanner(fifo)
for scanner.Scan() {
Expand Down Expand Up @@ -474,14 +532,17 @@ func connectOut(toSink func([]byte) error) {
w.WriteHeader(200)
}
})
server := &http.Server{Addr: ":3569"}
closers = append(closers, func(ctx context.Context) error {
logger.Info("closing HTTP server")
return server.Shutdown(ctx)
})
go func() {
defer runtimeutil.HandleCrash()
logger.Info("starting HTTP server")
err := http.ListenAndServe(":3569", nil)
if err != nil {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error(err, "failed to listen-and-server")
os.Exit(1)
}
logger.Info("HTTP server shutdown")
}()
}

Expand All @@ -499,7 +560,10 @@ func connectSink() (func([]byte) error, error) {
if err != nil {
return nil, fmt.Errorf("failed to connect to stan url=%s clusterID=%s clientID=%s subject=%s: %w", x.NATSURL, x.ClusterID, clientID, x.Subject, err)
}
closers = append(closers, sc.Close)
closers = append(closers, func(ctx context.Context) error {
logger.Info("closing stan connection", "sink", sinkName)
return sc.Close()
})
sinks[sinkName] = func(msg []byte) error { return sc.Publish(x.Subject, msg) }
} else if x := sink.Kafka; x != nil {
config, err := newKafkaConfig(x)
Expand All @@ -511,7 +575,10 @@ func connectSink() (func([]byte) error, error) {
if err != nil {
return nil, fmt.Errorf("failed to create kafka producer: %w", err)
}
closers = append(closers, producer.Close)
closers = append(closers, func(ctx context.Context) error {
logger.Info("closing stan producer", "sink", sinkName)
return producer.Close()
})
sinks[sinkName] = func(msg []byte) error {
_, _, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: x.Topic,
Expand Down
6 changes: 4 additions & 2 deletions runner/util/do.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,16 @@ func Do(ctx context.Context, fn func(msg []byte) ([][]byte, error)) error {
w.WriteHeader(200)
})

server := &http.Server{Addr: ":8080"}

go func() {
defer runtimeutil.HandleCrash()
if err := http.ListenAndServe(":8080", nil); err != nil {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
panic(err)
}
}()

<-ctx.Done()

return nil
return server.Shutdown(context.Background())
}

0 comments on commit de8a19f

Please sign in to comment.