@@ -51,7 +51,7 @@ public class DefaultStreamPoller implements StreamPoller {
5151 private ExecutorService processorThread ;
5252
5353 // start of the batch, inclusive
54- private IngestionShardPointer batchStartPointer ;
54+ private IngestionShardPointer initialBatchStartPointer ;
5555 private boolean includeBatchStartPointer = false ;
5656
5757 private ResetState resetState ;
@@ -104,7 +104,7 @@ public DefaultStreamPoller(
104104 this .consumer = Objects .requireNonNull (consumer );
105105 this .resetState = resetState ;
106106 this .resetValue = resetValue ;
107- this .batchStartPointer = startPointer ;
107+ this .initialBatchStartPointer = startPointer ;
108108 this .state = initialState ;
109109 this .persistedPointers = persistedPointers ;
110110 if (!this .persistedPointers .isEmpty ()) {
@@ -169,23 +169,23 @@ protected void startPoll() {
169169 if (resetState != ResetState .NONE ) {
170170 switch (resetState ) {
171171 case EARLIEST :
172- batchStartPointer = consumer .earliestPointer ();
173- logger .info ("Resetting offset by seeking to earliest offset {}" , batchStartPointer .asString ());
172+ initialBatchStartPointer = consumer .earliestPointer ();
173+ logger .info ("Resetting offset by seeking to earliest offset {}" , initialBatchStartPointer .asString ());
174174 break ;
175175 case LATEST :
176- batchStartPointer = consumer .latestPointer ();
177- logger .info ("Resetting offset by seeking to latest offset {}" , batchStartPointer .asString ());
176+ initialBatchStartPointer = consumer .latestPointer ();
177+ logger .info ("Resetting offset by seeking to latest offset {}" , initialBatchStartPointer .asString ());
178178 break ;
179179 case REWIND_BY_OFFSET :
180- batchStartPointer = consumer .pointerFromOffset (resetValue );
181- logger .info ("Resetting offset by seeking to offset {}" , batchStartPointer .asString ());
180+ initialBatchStartPointer = consumer .pointerFromOffset (resetValue );
181+ logger .info ("Resetting offset by seeking to offset {}" , initialBatchStartPointer .asString ());
182182 break ;
183183 case REWIND_BY_TIMESTAMP :
184- batchStartPointer = consumer .pointerFromTimestampMillis (Long .parseLong (resetValue ));
184+ initialBatchStartPointer = consumer .pointerFromTimestampMillis (Long .parseLong (resetValue ));
185185 logger .info (
186186 "Resetting offset by seeking to timestamp {}, corresponding offset {}" ,
187187 resetValue ,
188- batchStartPointer .asString ()
188+ initialBatchStartPointer .asString ()
189189 );
190190 break ;
191191 }
@@ -208,7 +208,8 @@ protected void startPoll() {
208208 List <IngestionShardConsumer .ReadResult <? extends IngestionShardPointer , ? extends Message >> results ;
209209
210210 if (includeBatchStartPointer ) {
211- results = consumer .readNext (batchStartPointer , true , MAX_POLL_SIZE , POLL_TIMEOUT );
211+ results = consumer .readNext (initialBatchStartPointer , true , MAX_POLL_SIZE , POLL_TIMEOUT );
212+ includeBatchStartPointer = false ;
212213 } else {
213214 results = consumer .readNext (MAX_POLL_SIZE , POLL_TIMEOUT );
214215 }
@@ -219,38 +220,42 @@ protected void startPoll() {
219220 }
220221
221222 state = State .PROCESSING ;
222- // process the records
223- boolean firstInBatch = true ;
224- for (IngestionShardConsumer .ReadResult <? extends IngestionShardPointer , ? extends Message > result : results ) {
225- if (firstInBatch ) {
226- // update the batch start pointer to the next batch
227- batchStartPointer = result .getPointer ();
228- firstInBatch = false ;
229- }
223+ processRecords (results );
224+ } catch (Throwable e ) {
225+ // Pause ingestion when an error is encountered while polling the streaming source.
226+ // Currently we do not have a good way to skip past the failing messages.
227+ // The user will have the option to manually update the offset and resume ingestion.
228+ // todo: support retry?
229+ logger .error ("Pausing ingestion. Fatal error occurred in polling the shard {}: {}" , consumer .getShardId (), e );
230+ pause ();
231+ }
232+ }
233+ }
230234
231- // check if the message is already processed
232- if (isProcessed (result .getPointer ())) {
233- logger .info ("Skipping message with pointer {} as it is already processed" , result .getPointer ().asString ());
234- continue ;
235- }
236- totalPolledCount .inc ();
237- blockingQueue .put (result );
238-
239- logger .debug (
240- "Put message {} with pointer {} to the blocking queue" ,
241- String .valueOf (result .getMessage ().getPayload ()),
242- result .getPointer ().asString ()
243- );
235+ private void processRecords (List <IngestionShardConsumer .ReadResult <? extends IngestionShardPointer , ? extends Message >> results ) {
236+ for (IngestionShardConsumer .ReadResult <? extends IngestionShardPointer , ? extends Message > result : results ) {
237+ try {
238+ // check if the message is already processed
239+ if (isProcessed (result .getPointer ())) {
240+ logger .info ("Skipping message with pointer {} as it is already processed" , result .getPointer ().asString ());
241+ continue ;
244242 }
245- // for future reads, we do not need to include the batch start pointer, and read from the last successful pointer.
246- includeBatchStartPointer = false ;
243+ totalPolledCount .inc ();
244+ blockingQueue .put (result );
245+
246+ logger .debug (
247+ "Put message {} with pointer {} to the blocking queue" ,
248+ String .valueOf (result .getMessage ().getPayload ()),
249+ result .getPointer ().asString ()
250+ );
247251 } catch (Throwable e ) {
248- logger .error ("Error in polling the shard {} : {}" , consumer .getShardId (), e );
252+ logger .error ("Error in processing a record. Shard {}, pointer {} : {}" , consumer .getShardId (), result . getPointer (), e );
249253 errorStrategy .handleError (e , IngestionErrorStrategy .ErrorStage .POLLING );
250254
251255 if (!errorStrategy .shouldIgnoreError (e , IngestionErrorStrategy .ErrorStage .POLLING )) {
252256 // Blocking error encountered. Pause poller to stop processing remaining updates.
253257 pause ();
258+ break ;
254259 }
255260 }
256261 }
@@ -328,9 +333,16 @@ public boolean isClosed() {
328333 return closed ;
329334 }
330335
336+ /**
337+ * Returns the batch start pointer from where the poller can resume in case of shard recovery. The poller and
338+ * processor are decoupled in this implementation, and hence the latest pointer tracked by the processor acts as the
339+ * recovery/start point. In case the processor has not started tracking, then the initial batchStartPointer used by
340+ * the poller acts as the start point.
341+ */
331342 @ Override
332343 public IngestionShardPointer getBatchStartPointer () {
333- return batchStartPointer ;
344+ IngestionShardPointer currentShardPointer = processorRunnable .getCurrentShardPointer ();
345+ return currentShardPointer == null ? initialBatchStartPointer : currentShardPointer ;
334346 }
335347
336348 @ Override
0 commit comments