Skip to content

Commit

Permalink
feat: only check Kafka partition for pending
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 20, 2021
1 parent d3e02ea commit e17f961
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 17 deletions.
4 changes: 3 additions & 1 deletion runner/sidecar/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@ import (
type handler struct {
name string
sourceToMain func([]byte) error
partition int32
offset int64
}

func (*handler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (*handler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for m := range claim.Messages() {
debug.Info("◷ kafka →", "m", printable(m.Value), "offset", m.Offset)
debug.Info("◷ kafka →", "m", printable(m.Value), "offset", m.Offset, "partition", m.Partition)
withLock(func() { sourceStatues.Set(h.name, replica, printable(m.Value)) })
if err := h.sourceToMain(m.Value); err != nil {
withLock(func() { sourceStatues.IncErrors(h.name, replica, err) })
debug.Error(err, "⚠ kafka →")
} else {
debug.Info("✔ kafka →")
h.partition = m.Partition
h.offset = m.Offset
sess.MarkMessage(m, "")
}
Expand Down
26 changes: 10 additions & 16 deletions runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
return fmt.Errorf("failed to create kafka consumer group: %w", err)
}
closers = append(closers, group.Close)
handler := &handler{sourceName, toMain, 0}
handler := &handler{sourceName, toMain, 0, 0}
go func() {
defer runtimeutil.HandleCrash()
if err := group.Consume(ctx, []string{x.Topic}, handler); err != nil {
Expand All @@ -322,23 +322,17 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
go func() {
defer runtimeutil.HandleCrash()
for {
if partitions, err := client.Partitions(x.Topic); err != nil {
logger.Error(err, "failed to get offset", "topic", x.Topic)
} else {
var newestOffset int64
for _, p := range partitions {
v, err := client.GetOffset(x.Topic, p, sarama.OffsetNewest)
if handler.offset > 0 {
withLock(func() {
newestOffset, err := client.GetOffset(x.Topic, handler.partition, sarama.OffsetNewest)
if err != nil {
logger.Error(err, "failed to get offset", "topic", x.Topic)
} else if v > newestOffset {
newestOffset = v
} else if newestOffset > handler.offset { // zero implies we've not processed a message yet
pending := uint64(newestOffset - handler.offset)
debug.Info("setting pending", "type", "kafka", "topic", x.Topic, "pending", pending)
sourceStatues.SetPending(sourceName, pending)
}
}
if newestOffset > handler.offset && handler.offset > 0 { // zero implies we've not processed a message yet
pending := uint64(newestOffset - handler.offset)
debug.Info("setting pending", "type", "kafka", "topic", x.Topic, "pending", pending)
withLock(func() { sourceStatues.SetPending(sourceName, pending) })
}
})
}
time.Sleep(updateInterval)
}
Expand Down Expand Up @@ -516,7 +510,7 @@ func connectSink() (func([]byte) error, error) {
for _, sink := range spec.Sinks {
sinkName := sink.Name
if s := sink.STAN; s != nil {
clientID := fmt.Sprintf("%s-%s-%d-sink-%d", pipelineName, spec.Name, sinkName)
clientID := fmt.Sprintf("%s-%s-%s", pipelineName, spec.Name, sinkName)
logger.Info("connecting sink", "type", "stan", "url", s.NATSURL, "clusterID", s.ClusterID, "clientID", clientID, "subject", s.Subject)
sc, err := stan.Connect(s.ClusterID, clientID, stan.NatsURL(s.NATSURL))
if err != nil {
Expand Down

0 comments on commit e17f961

Please sign in to comment.