Skip to content

Commit

Permalink
fix: failed to record sink status
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 18, 2021
1 parent 3db2fbc commit 6dc28a2
Showing 1 changed file with 61 additions and 59 deletions.
120 changes: 61 additions & 59 deletions runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,34 +157,34 @@ func patchStepStatus(ctx context.Context) {
func enrichSpec(ctx context.Context) error {
secrets := kubernetesInterface.CoreV1().Secrets(namespace)
for i, source := range spec.Sources {
if s := source.STAN; s != nil {
secret, err := secrets.Get(ctx, "dataflow-stan-"+s.Name, metav1.GetOptions{})
if x := source.STAN; x != nil {
secret, err := secrets.Get(ctx, "dataflow-stan-"+x.Name, metav1.GetOptions{})
if err != nil {
if !apierr.IsNotFound(err) {
return err
}
} else {
s.NATSURL = dfv1.StringOr(s.NATSURL, string(secret.Data["natsUrl"]))
s.ClusterID = dfv1.StringOr(s.ClusterID, string(secret.Data["clusterId"]))
s.SubjectPrefix = dfv1.SubjectPrefixOr(s.SubjectPrefix, dfv1.SubjectPrefix(secret.Data["subjectPrefix"]))
x.NATSURL = dfv1.StringOr(x.NATSURL, string(secret.Data["natsUrl"]))
x.ClusterID = dfv1.StringOr(x.ClusterID, string(secret.Data["clusterId"]))
x.SubjectPrefix = dfv1.SubjectPrefixOr(x.SubjectPrefix, dfv1.SubjectPrefix(secret.Data["subjectPrefix"]))
}
switch s.SubjectPrefix {
switch x.SubjectPrefix {
case dfv1.SubjectPrefixNamespaceName:
s.Subject = fmt.Sprintf("%s.%s", namespace, s.Subject)
x.Subject = fmt.Sprintf("%s.%s", namespace, x.Subject)
case dfv1.SubjectPrefixNamespacedPipelineName:
s.Subject = fmt.Sprintf("%s.%s.%s", namespace, pipelineName, s.Subject)
x.Subject = fmt.Sprintf("%s.%s.%s", namespace, pipelineName, x.Subject)
}
source.STAN = s
} else if k := source.Kafka; k != nil {
secret, err := secrets.Get(ctx, "dataflow-kafka-"+k.Name, metav1.GetOptions{})
source.STAN = x
} else if x := source.Kafka; x != nil {
secret, err := secrets.Get(ctx, "dataflow-kafka-"+x.Name, metav1.GetOptions{})
if err != nil {
if !apierr.IsNotFound(err) {
return err
}
} else {
k.Brokers = dfv1.StringsOr(k.Brokers, strings.Split(string(secret.Data["brokers"]), ","))
x.Brokers = dfv1.StringsOr(x.Brokers, strings.Split(string(secret.Data["brokers"]), ","))
}
source.Kafka = k
source.Kafka = x
}
spec.Sources[i] = source
}
Expand Down Expand Up @@ -230,48 +230,49 @@ func enrichSpec(ctx context.Context) error {
}

func connectSources(ctx context.Context, toMain func([]byte) error) error {
x := cron.New(
crn := cron.New(
cron.WithParser(cron.NewParser(cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor)),
cron.WithChain(cron.Recover(logger)),
)
go x.Run()
go crn.Run()
closers = append(closers, func() error {
_ = x.Stop()
_ = crn.Stop()
return nil
})
for i, source := range spec.Sources {
if c := source.Cron; c != nil {
logger.Info("connecting to source", "type", "cron", "schedule", c.Schedule)
_, err := x.AddFunc(c.Schedule, func() {
data := []byte(time.Now().Format(c.Layout))
sourceName := source.Name
if x := source.Cron; x != nil {
logger.Info("connecting to source", "type", "cron", "schedule", x.Schedule)
_, err := crn.AddFunc(x.Schedule, func() {
data := []byte(time.Now().Format(x.Layout))
debug.Info("◷ cron →", "m", printable(data))
withLock(func() { sourceStatues.Set(source.Name, replica, printable(data)) })
withLock(func() { sourceStatues.Set(sourceName, replica, printable(data)) })
if err := toMain(data); err != nil {
logger.Error(err, "⚠ cron →")
withLock(func() { sourceStatues.IncErrors(source.Name, replica, err) })
withLock(func() { sourceStatues.IncErrors(sourceName, replica, err) })
} else {
debug.Info("✔ cron → ", "schedule", c.Schedule)
debug.Info("✔ cron → ", "schedule", x.Schedule)
}
})
if err != nil {
return fmt.Errorf("failed to schedule cron %q: %w", c.Schedule, err)
return fmt.Errorf("failed to schedule cron %q: %w", x.Schedule, err)
}
} else if s := source.STAN; s != nil {
} else if x := source.STAN; x != nil {
clientID := fmt.Sprintf("%s-%s-%d-source-%d", pipelineName, spec.Name, replica, i)
logger.Info("connecting to source", "type", "stan", "url", s.NATSURL, "clusterID", s.ClusterID, "clientID", clientID, "subject", s.Subject)
sc, err := stan.Connect(s.ClusterID, clientID, stan.NatsURL(s.NATSURL))
logger.Info("connecting to source", "type", "stan", "url", x.NATSURL, "clusterID", x.ClusterID, "clientID", clientID, "subject", x.Subject)
sc, err := stan.Connect(x.ClusterID, clientID, stan.NatsURL(x.NATSURL))
if err != nil {
return fmt.Errorf("failed to connect to stan url=%s clusterID=%s clientID=%s subject=%s: %w", s.NATSURL, s.ClusterID, clientID, s.Subject, err)
return fmt.Errorf("failed to connect to stan url=%s clusterID=%s clientID=%s subject=%s: %w", x.NATSURL, x.ClusterID, clientID, x.Subject, err)
}
closers = append(closers, sc.Close)
if sub, err := sc.QueueSubscribe(s.Subject, fmt.Sprintf("%s-%s", pipelineName, spec.Name), func(m *stan.Msg) {
if sub, err := sc.QueueSubscribe(x.Subject, fmt.Sprintf("%s-%s", pipelineName, spec.Name), func(m *stan.Msg) {
debug.Info("◷ stan →", "m", printable(m.Data))
withLock(func() { sourceStatues.Set(source.Name, replica, printable(m.Data)) })
withLock(func() { sourceStatues.Set(sourceName, replica, printable(m.Data)) })
if err := toMain(m.Data); err != nil {
logger.Error(err, "⚠ stan →")
withLock(func() { sourceStatues.IncErrors(source.Name, replica, err) })
withLock(func() { sourceStatues.IncErrors(sourceName, replica, err) })
} else {
debug.Info("✔ stan → ", "subject", s.Subject)
debug.Info("✔ stan → ", "subject", x.Subject)
}
}, stan.DeliverAllAvailable(), stan.DurableName(clientID)); err != nil {
return fmt.Errorf("failed to subscribe: %w", err)
Expand All @@ -281,81 +282,81 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
for {
if pending, _, err := sub.Pending(); err != nil {
logger.Error(err, "failed to get pending", "subject", s.Subject)
logger.Error(err, "failed to get pending", "subject", x.Subject)
} else {
debug.Info("setting pending", "subject", s.Subject, "pending", pending)
withLock(func() { sourceStatues.SetPending(source.Name, uint64(pending)) })
debug.Info("setting pending", "subject", x.Subject, "pending", pending)
withLock(func() { sourceStatues.SetPending(sourceName, uint64(pending)) })
}
time.Sleep(updateInterval)
}
}()
}
} else if k := source.Kafka; k != nil {
logger.Info("connecting kafka source", "type", "kafka", "brokers", k.Brokers, "topic", k.Topic)
config, err := newKafkaConfig(k)
} else if x := source.Kafka; x != nil {
logger.Info("connecting kafka source", "type", "kafka", "brokers", x.Brokers, "topic", x.Topic)
config, err := newKafkaConfig(x)
if err != nil {
return err
}
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetNewest
client, err := sarama.NewClient(k.Brokers, config) // I am not giving any configuration
client, err := sarama.NewClient(x.Brokers, config) // I am not giving any configuration
if err != nil {
return err
}
closers = append(closers, client.Close)
group, err := sarama.NewConsumerGroup(k.Brokers, pipelineName+"-"+spec.Name, config)
group, err := sarama.NewConsumerGroup(x.Brokers, pipelineName+"-"+spec.Name, config)
if err != nil {
return fmt.Errorf("failed to create kafka consumer group: %w", err)
}
closers = append(closers, group.Close)
handler := &handler{source.Name, toMain, 0}
handler := &handler{sourceName, toMain, 0}
go func() {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
if err := group.Consume(ctx, []string{k.Topic}, handler); err != nil {
if err := group.Consume(ctx, []string{x.Topic}, handler); err != nil {
logger.Error(err, "failed to create kafka consumer")
}
}()
go func() {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
for {
if partitions, err := client.Partitions(k.Topic); err != nil {
logger.Error(err, "failed to get offset", "topic", k.Topic)
if partitions, err := client.Partitions(x.Topic); err != nil {
logger.Error(err, "failed to get offset", "topic", x.Topic)
} else {
var newestOffset int64
for _, p := range partitions {
v, err := client.GetOffset(k.Topic, p, sarama.OffsetNewest)
v, err := client.GetOffset(x.Topic, p, sarama.OffsetNewest)
if err != nil {
logger.Error(err, "failed to get offset", "topic", k.Topic)
logger.Error(err, "failed to get offset", "topic", x.Topic)
} else if v > newestOffset {
newestOffset = v
}
}
if newestOffset > handler.offset && handler.offset > 0 { // zero implies we've not processed a message yet
pending := uint64(newestOffset - handler.offset)
debug.Info("setting pending", "type", "kafka", "topic", k.Topic, "pending", pending)
withLock(func() { sourceStatues.SetPending(source.Name, pending) })
debug.Info("setting pending", "type", "kafka", "topic", x.Topic, "pending", pending)
withLock(func() { sourceStatues.SetPending(sourceName, pending) })
}
}
time.Sleep(updateInterval)
}
}()
} else if x := source.HTTP; x != nil {
logger.Info("connecting source", "type", "http")
http.HandleFunc("/sources/"+source.Name, func(w http.ResponseWriter, r *http.Request) {
http.HandleFunc("/sources/"+sourceName, func(w http.ResponseWriter, r *http.Request) {
println("host", r.Host)
data, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.Error(err, "⚠ http →")
w.WriteHeader(500)
withLock(func() { sourceStatues.IncErrors(source.Name, replica, err) })
withLock(func() { sourceStatues.IncErrors(sourceName, replica, err) })
return
}
debug.Info("◷ http →", "m", printable(data))
withLock(func() { sourceStatues.Set(source.Name, replica, printable(data)) })
withLock(func() { sourceStatues.Set(sourceName, replica, printable(data)) })
if err := toMain(data); err != nil {
logger.Error(err, "⚠ http →")
w.WriteHeader(500)
withLock(func() { sourceStatues.IncErrors(source.Name, replica, err) })
withLock(func() { sourceStatues.IncErrors(sourceName, replica, err) })
} else {
debug.Info("✔ http ")
w.WriteHeader(200)
Expand Down Expand Up @@ -504,6 +505,7 @@ func connectOut(toSink func([]byte) error) {
func connectSink() (func([]byte) error, error) {
var toSinks []func([]byte) error
for i, sink := range spec.Sinks {
sinkName := sink.Name
if s := sink.STAN; s != nil {
clientID := fmt.Sprintf("%s-%s-%d-sink-%d", pipelineName, spec.Name, replica, i)
logger.Info("connecting sink", "type", "stan", "url", s.NATSURL, "clusterID", s.ClusterID, "clientID", clientID, "subject", s.Subject)
Expand All @@ -513,11 +515,11 @@ func connectSink() (func([]byte) error, error) {
}
closers = append(closers, sc.Close)
toSinks = append(toSinks, func(m []byte) error {
withLock(func() { sinkStatues.Set(sink.Name, replica, printable(m)) })
withLock(func() { sinkStatues.Set(sinkName, replica, printable(m)) })
debug.Info("◷ → stan", "subject", s.Subject, "m", printable(m))
err := sc.Publish(s.Subject, m)
if err != nil {
withLock(func() { sinkStatues.IncErrors(sink.Name, replica, err) })
withLock(func() { sinkStatues.IncErrors(sinkName, replica, err) })
}
return err
})
Expand All @@ -534,28 +536,28 @@ func connectSink() (func([]byte) error, error) {
}
closers = append(closers, producer.Close)
toSinks = append(toSinks, func(m []byte) error {
withLock(func() { sinkStatues.Set(sink.Name, replica, printable(m)) })
withLock(func() { sinkStatues.Set(sinkName, replica, printable(m)) })
debug.Info("◷ → kafka", "topic", k.Topic, "m", printable(m))
_, _, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: k.Topic,
Value: sarama.ByteEncoder(m),
})
if err != nil {
withLock(func() { sinkStatues.IncErrors(sink.Name, replica, err) })
withLock(func() { sinkStatues.IncErrors(sinkName, replica, err) })
}
return err
})
} else if l := sink.Log; l != nil {
logger.Info("connecting sink", "type", "log")
toSinks = append(toSinks, func(m []byte) error {
withLock(func() { sinkStatues.Set(sink.Name, replica, printable(m)) })
withLock(func() { sinkStatues.Set(sinkName, replica, printable(m)) })
logger.Info(string(m), "type", "log")
return nil
})
} else if x := sink.HTTP; x != nil {
logger.Info("connecting sink", "type", "http")
toSinks = append(toSinks, func(m []byte) error {
withLock(func() { sinkStatues.Set(sink.Name, replica, printable(m)) })
withLock(func() { sinkStatues.Set(sinkName, replica, printable(m)) })
err := func() error {
if resp, err := http.Post(x.URL, "application/octet-stream", bytes.NewBuffer(m)); err != nil {
return err
Expand All @@ -566,7 +568,7 @@ func connectSink() (func([]byte) error, error) {
}
}()
if err != nil {
withLock(func() { sinkStatues.IncErrors(sink.Name, replica, err) })
withLock(func() { sinkStatues.IncErrors(sinkName, replica, err) })
}
return err
})
Expand Down

0 comments on commit 6dc28a2

Please sign in to comment.