Skip to content

Commit db4a5ad

Browse files
Move consumer initialization to the poller to prevent engine failure
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent d52d404 commit db4a5ad

File tree

5 files changed

+338
-67
lines changed

5 files changed

+338
-67
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.Set;
3838
import java.util.concurrent.ExecutionException;
3939

40+
import static org.hamcrest.Matchers.greaterThan;
4041
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
4142
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
4243
import static org.hamcrest.Matchers.is;
@@ -749,6 +750,47 @@ public void testIndexRelocation() throws Exception {
749750
waitForSearchableDocs(4, List.of(nodeB));
750751
}
751752

753+
public void testKafkaConnectionLost() throws Exception {
754+
// Step 1: Create 2 nodes
755+
internalCluster().startClusterManagerOnlyNode();
756+
final String nodeA = internalCluster().startDataOnlyNode();
757+
final String nodeB = internalCluster().startDataOnlyNode();
758+
759+
// Step 2: Create index
760+
createIndexWithDefaultSettings(1, 0);
761+
ensureGreen(indexName);
762+
assertTrue(nodeA.equals(primaryNodeName(indexName)));
763+
764+
// Step 3: Write documents and verify
765+
produceData("1", "name1", "24");
766+
produceData("2", "name2", "20");
767+
refresh(indexName);
768+
waitForSearchableDocs(2, List.of(nodeA));
769+
flush(indexName);
770+
771+
// Step 4: Stop kafka and relocate index to nodeB
772+
kafka.stop();
773+
assertAcked(
774+
client().admin()
775+
.indices()
776+
.prepareUpdateSettings(indexName)
777+
.setSettings(Settings.builder().put("index.routing.allocation.require._name", nodeB))
778+
.get()
779+
);
780+
781+
// Step 5: Wait for relocation to complete
782+
waitForState(() -> nodeB.equals(primaryNodeName(indexName)));
783+
784+
// Step 6: Ensure index is searchable on nodeB even though kafka is down
785+
ensureGreen(indexName);
786+
waitForSearchableDocs(2, List.of(nodeB));
787+
waitForState(() -> {
788+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
789+
.getPollingIngestStats();
790+
return stats.getConsumerStats().totalConsumerErrorCount() > 0;
791+
});
792+
}
793+
752794
private void verifyRemoteStoreEnabled(String node) {
753795
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
754796
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSingleNodeTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,24 @@ public void testPauseAndResumeAPIs() throws Exception {
133133
});
134134
}
135135

136+
// This test validates shard initialization does not fail due to kafka connection errors.
137+
public void testShardInitializationUsingUnknownTopic() throws Exception {
138+
createIndexWithMappingSource(
139+
indexName,
140+
Settings.builder()
141+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
142+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
143+
.put("ingestion_source.type", "kafka")
144+
.put("ingestion_source.pointer.init.reset", "earliest")
145+
.put("ingestion_source.param.topic", "unknownTopic")
146+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
147+
.put("index.replication.type", "SEGMENT")
148+
.build(),
149+
mappings
150+
);
151+
ensureGreen(indexName);
152+
}
153+
136154
private void setupKafka() {
137155
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
138156
// disable topic auto creation

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,7 @@ private void initializeStreamPoller(
9898
+ engineConfig.getIndexSettings().getIndex().getName()
9999
+ "-"
100100
+ engineConfig.getShardId().getId();
101-
IngestionShardConsumer ingestionShardConsumer = this.ingestionConsumerFactory.createShardConsumer(
102-
clientId,
103-
engineConfig.getShardId().getId()
104-
);
105-
logger.info("created ingestion consumer for shard [{}]", engineConfig.getShardId());
101+
106102
Map<String, String> commitData = commitDataAsMap(indexWriter);
107103
StreamPoller.ResetState resetState = ingestionSource.getPointerInitReset().getType();
108104
String resetValue = ingestionSource.getPointerInitReset().getValue();
@@ -146,20 +142,25 @@ private void initializeStreamPoller(
146142
StreamPoller.State initialPollerState = indexMetadata.getIngestionStatus().isPaused()
147143
? StreamPoller.State.PAUSED
148144
: StreamPoller.State.NONE;
149-
streamPoller = new DefaultStreamPoller(
145+
146+
// initialize the stream poller
147+
DefaultStreamPoller.Builder streamPollerBuilder = new DefaultStreamPoller.Builder(
150148
startPointer,
151149
persistedPointers,
152-
ingestionShardConsumer,
153-
this,
154-
resetState,
155-
resetValue,
156-
ingestionErrorStrategy,
157-
initialPollerState,
158-
ingestionSource.getMaxPollSize(),
159-
ingestionSource.getPollTimeout(),
160-
ingestionSource.getNumProcessorThreads(),
161-
ingestionSource.getBlockingQueueSize()
162-
);
150+
ingestionConsumerFactory,
151+
clientId,
152+
engineConfig.getShardId().getId(),
153+
this);
154+
streamPoller = streamPollerBuilder
155+
.resetState(resetState)
156+
.resetValue(resetValue)
157+
.errorStrategy(ingestionErrorStrategy)
158+
.initialState(initialPollerState)
159+
.maxPollSize(ingestionSource.getMaxPollSize())
160+
.pollTimeout(ingestionSource.getPollTimeout())
161+
.numProcessorThreads(ingestionSource.getNumProcessorThreads())
162+
.blockingQueueSize(ingestionSource.getBlockingQueueSize())
163+
.build();
163164
registerStreamPollerListener();
164165

165166
// start the polling loop
@@ -575,6 +576,10 @@ private void resetStreamPoller(StreamPoller.ResetState resetState, String resetV
575576
throw new IllegalStateException("Cannot reset consumer when poller is not paused");
576577
}
577578

579+
if (streamPoller.getConsumer() == null) {
580+
throw new OpenSearchException("Consumer is not yet initialized");
581+
}
582+
578583
try {
579584
// refresh is needed for persisted pointers to be visible
580585
refresh("reset poller", SearcherScope.INTERNAL, true);

0 commit comments

Comments
 (0)