Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Make GRPC transport extensible to allow plugins to register and expose their own GRPC services ([#18516](https://github.com/opensearch-project/OpenSearch/pull/18516))
- Added approximation support for range queries with now in date field ([#18511](https://github.com/opensearch-project/OpenSearch/pull/18511))
- Upgrade to protobufs 0.6.0 and clean up deprecated TermQueryProtoUtils code ([#18880](https://github.com/opensearch-project/OpenSearch/pull/18880))
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))

### Changed
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,59 @@ public void testIndexRelocation() throws Exception {
waitForSearchableDocs(4, List.of(nodeB));
}

public void testKafkaConnectionLost() throws Exception {
// Step 1: Create 2 nodes
internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();

// Step 2: Create index
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.param.auto.offset.reset", "earliest")
.put("index.routing.allocation.require._name", nodeA)
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);
ensureGreen(indexName);
assertTrue(nodeA.equals(primaryNodeName(indexName)));

// Step 3: Write documents and verify
produceData("1", "name1", "24");
produceData("2", "name2", "20");
refresh(indexName);
waitForSearchableDocs(2, List.of(nodeA));
flush(indexName);

// Step 4: Stop kafka and relocate index to nodeB
kafka.stop();
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put("index.routing.allocation.require._name", nodeB))
.get()
);

// Step 5: Wait for relocation to complete
waitForState(() -> nodeB.equals(primaryNodeName(indexName)));

// Step 6: Ensure index is searchable on nodeB even though kafka is down
ensureGreen(indexName);
waitForSearchableDocs(2, List.of(nodeB));
waitForState(() -> {
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
return stats.getConsumerStats().totalConsumerErrorCount() > 0;
});
}

private void verifyRemoteStoreEnabled(String node) {
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,24 @@ public void testPauseAndResumeAPIs() throws Exception {
});
}

// This test validates shard initialization does not fail due to kafka connection errors.
public void testShardInitializationUsingUnknownTopic() throws Exception {
createIndexWithMappingSource(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", "unknownTopic")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("index.replication.type", "SEGMENT")
.build(),
mappings
);
ensureGreen(indexName);
}

private void setupKafka() {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
// disable topic auto creation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.common.Strings;
import org.opensearch.index.IngestionConsumerFactory;
import org.opensearch.index.IngestionShardConsumer;
import org.opensearch.index.IngestionShardPointer;
import org.opensearch.index.VersionType;
import org.opensearch.index.mapper.DocumentMapperForType;
Expand Down Expand Up @@ -98,11 +97,7 @@ private void initializeStreamPoller(
+ engineConfig.getIndexSettings().getIndex().getName()
+ "-"
+ engineConfig.getShardId().getId();
IngestionShardConsumer ingestionShardConsumer = this.ingestionConsumerFactory.createShardConsumer(
clientId,
engineConfig.getShardId().getId()
);
logger.info("created ingestion consumer for shard [{}]", engineConfig.getShardId());

Map<String, String> commitData = commitDataAsMap(indexWriter);
StreamPoller.ResetState resetState = ingestionSource.getPointerInitReset().getType();
String resetValue = ingestionSource.getPointerInitReset().getValue();
Expand Down Expand Up @@ -146,20 +141,25 @@ private void initializeStreamPoller(
StreamPoller.State initialPollerState = indexMetadata.getIngestionStatus().isPaused()
? StreamPoller.State.PAUSED
: StreamPoller.State.NONE;
streamPoller = new DefaultStreamPoller(

// initialize the stream poller
DefaultStreamPoller.Builder streamPollerBuilder = new DefaultStreamPoller.Builder(
startPointer,
persistedPointers,
ingestionShardConsumer,
this,
resetState,
resetValue,
ingestionErrorStrategy,
initialPollerState,
ingestionSource.getMaxPollSize(),
ingestionSource.getPollTimeout(),
ingestionSource.getNumProcessorThreads(),
ingestionSource.getBlockingQueueSize()
ingestionConsumerFactory,
clientId,
engineConfig.getShardId().getId(),
this
);
streamPoller = streamPollerBuilder.resetState(resetState)
.resetValue(resetValue)
.errorStrategy(ingestionErrorStrategy)
.initialState(initialPollerState)
.maxPollSize(ingestionSource.getMaxPollSize())
.pollTimeout(ingestionSource.getPollTimeout())
.numProcessorThreads(ingestionSource.getNumProcessorThreads())
.blockingQueueSize(ingestionSource.getBlockingQueueSize())
.build();
registerStreamPollerListener();

// start the polling loop
Expand Down Expand Up @@ -575,6 +575,10 @@ private void resetStreamPoller(StreamPoller.ResetState resetState, String resetV
throw new IllegalStateException("Cannot reset consumer when poller is not paused");
}

if (streamPoller.getConsumer() == null) {
throw new IllegalStateException("Consumer is not yet initialized");
}

try {
// refresh is needed for persisted pointers to be visible
refresh("reset poller", SearcherScope.INTERNAL, true);
Expand Down
Loading
Loading