diff --git a/runner/sidecar/source/kafka/kafka.go b/runner/sidecar/source/kafka/kafka.go index 2f257d6f..a869c3d4 100644 --- a/runner/sidecar/source/kafka/kafka.go +++ b/runner/sidecar/source/kafka/kafka.go @@ -35,10 +35,11 @@ func New(ctx context.Context, pipelineName, stepName, sourceName string, x dfv1. TLS: t, } reader := kafka.NewReader(kafka.ReaderConfig{ - Brokers: x.Brokers, - Dialer: dialer, - GroupID: groupName, - Topic: x.Topic, + Brokers: x.Brokers, + Dialer: dialer, + GroupID: groupName, + Topic: x.Topic, + StartOffset: kafka.LastOffset, }) go wait.JitterUntil(func() { ctx := context.Background()