From 4cdf4bb117833fee39872b3740053a91e32ab038 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Fri, 16 Jul 2021 12:57:03 -0700 Subject: [PATCH] fix: Kafka should start at LastOffset by default --- runner/sidecar/source/kafka/kafka.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/runner/sidecar/source/kafka/kafka.go b/runner/sidecar/source/kafka/kafka.go index 2f257d6f..a869c3d4 100644 --- a/runner/sidecar/source/kafka/kafka.go +++ b/runner/sidecar/source/kafka/kafka.go @@ -35,10 +35,11 @@ func New(ctx context.Context, pipelineName, stepName, sourceName string, x dfv1. TLS: t, } reader := kafka.NewReader(kafka.ReaderConfig{ - Brokers: x.Brokers, - Dialer: dialer, - GroupID: groupName, - Topic: x.Topic, + Brokers: x.Brokers, + Dialer: dialer, + GroupID: groupName, + Topic: x.Topic, + StartOffset: kafka.LastOffset, }) go wait.JitterUntil(func() { ctx := context.Background()