diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java b/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java index 098ebe39452e3..771ba05b54f9d 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java @@ -313,6 +313,7 @@ public void run() { errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.PROCESSING); boolean retriesExhausted = retryCount >= MIN_RETRY_COUNT || e instanceof IllegalArgumentException; if (retriesExhausted && errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.PROCESSING)) { + logDroppedMessage(shardUpdateMessage); shardUpdateMessage = null; retryCount = 0; messageProcessorMetrics.failedMessageDroppedCounter.inc(); @@ -360,6 +361,13 @@ public void close() { closed = true; } + private void logDroppedMessage(ShardUpdateMessage shardUpdateMessage) { + String id = shardUpdateMessage.autoGeneratedIdTimestamp() == UNSET_AUTO_GENERATED_TIMESTAMP + ? (String) shardUpdateMessage.parsedPayloadMap().get(ID) + : "null"; + logger.warn("Exhausted retries, dropping message: _id:{}, pointer:{}", id, shardUpdateMessage.pointer().asString()); + } + /** * Tracks MessageProcessor metrics. */ diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java index a5c15690ae93b..3a58a2f20742c 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/MessageProcessorTests.java @@ -9,6 +9,7 @@ package org.opensearch.indices.pollingingest; import org.opensearch.action.DocWriteRequest; +import org.opensearch.index.IngestionShardPointer; import org.opensearch.index.Message; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.FakeIngestionSource; @@ -23,6 +24,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; @@ -240,7 +242,7 @@ public void testMessageRetrySuccess() throws Exception { thread.interrupt(); } - public void testMessageRetryFail() throws Exception { + public void testDropPolicyMessageRetryFail() throws Exception { MessageProcessorRunnable.MessageProcessor processor = mock(MessageProcessorRunnable.MessageProcessor.class); DropIngestionErrorStrategy errorStrategy = new DropIngestionErrorStrategy("ingestion_source"); MessageProcessorRunnable messageProcessorRunnable = new MessageProcessorRunnable( @@ -248,21 +250,20 @@ public void testMessageRetryFail() throws Exception { processor, errorStrategy ); - messageProcessorRunnable.getBlockingQueue().put(new ShardUpdateMessage(null, null, null, 0)); + messageProcessorRunnable.getBlockingQueue() + .put(new ShardUpdateMessage(mock(IngestionShardPointer.class), null, Collections.emptyMap(), 0)); + messageProcessorRunnable.getBlockingQueue() + .put(new ShardUpdateMessage(mock(IngestionShardPointer.class), null, Collections.emptyMap(), -1)); - doThrow(new RuntimeException()).doThrow(new RuntimeException()) - .doThrow(new RuntimeException()) - .doNothing() - .when(processor) - .process(any(), any()); + doThrow(new RuntimeException()).when(processor).process(any(), any()); Thread thread = new Thread(messageProcessorRunnable::run); thread.start(); assertBusy(() -> { - verify(processor, times(3)).process(any(), any()); - assertEquals(1, messageProcessorRunnable.getMessageProcessorMetrics().failedMessageDroppedCounter().count()); - assertEquals(3, messageProcessorRunnable.getMessageProcessorMetrics().failedMessageCounter().count()); - }, 1, TimeUnit.MINUTES); + verify(processor, times(6)).process(any(), any()); + assertEquals(2, messageProcessorRunnable.getMessageProcessorMetrics().failedMessageDroppedCounter().count()); + assertEquals(6, messageProcessorRunnable.getMessageProcessorMetrics().failedMessageCounter().count()); + }, 2, TimeUnit.MINUTES); messageProcessorRunnable.close(); thread.interrupt();