diff --git a/CHANGELOG.md b/CHANGELOG.md index abd4ee1fad4d4..64c03e324f359 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Apply cluster state metadata and routing table diff when building cluster state from remote([#18256](https://github.com/opensearch-project/OpenSearch/pull/18256)) - Support create mode in pull-based ingestion and add retries for transient failures ([#18250](https://github.com/opensearch-project/OpenSearch/pull/18250))) - Decouple the init of Crypto Plugin and KeyProvider in CryptoRegistry ([18270](https://github.com/opensearch-project/OpenSearch/pull18270))) +- Support cluster write block in pull-based ingestion ([#18280](https://github.com/opensearch-project/OpenSearch/pull/18280))) ### Changed diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java index 76e6331b33cea..445ae8f4fed68 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java @@ -204,4 +204,12 @@ protected void recreateKafkaTopics(int numKafkaPartitions) { cleanup(); setupKafka(numKafkaPartitions); } + + protected void setWriteBlock(String indexName, boolean isWriteBlockEnabled) { + client().admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put("index.blocks.write", isWriteBlockEnabled)) + .get(); + } } diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java index 9402f2c88df2b..c2a94cf323fc5 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java @@ -449,6 +449,55 @@ public void testExternalVersioningWithDisabledGCDeletes() throws Exception { } + public void testClusterWriteBlock() throws Exception { + // setup nodes and index + produceData("1", "name1", "24"); + produceData("2", "name2", "20"); + internalCluster().startClusterManagerOnlyNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + final String nodeB = internalCluster().startDataOnlyNode(); + + createIndexWithDefaultSettings(1, 1); + ensureGreen(indexName); + waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB)); + + // create a write block + setWriteBlock(indexName, true); + waitForState(() -> { + GetIngestionStateResponse ingestionState = getIngestionState(indexName); + return ingestionState.getFailedShards() == 0 + && Arrays.stream(ingestionState.getShardStates()) + .allMatch(state -> state.isWriteBlockEnabled() && state.pollerState().equalsIgnoreCase("paused")); + }); + + // verify write block state in poller is persisted + produceData("3", "name3", "30"); + produceData("4", "name4", "31"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA)); + ensureYellowAndNoInitializingShards(indexName); + assertTrue(nodeB.equals(primaryNodeName(indexName))); + + final String nodeC = internalCluster().startDataOnlyNode(); + client().admin().cluster().prepareReroute().add(new AllocateReplicaAllocationCommand(indexName, 0, nodeC)).get(); + ensureGreen(indexName); + assertTrue(nodeC.equals(replicaNodeName(indexName))); + waitForState(() -> { + GetIngestionStateResponse ingestionState = getIngestionState(indexName); + return Arrays.stream(ingestionState.getShardStates()) + .allMatch(state -> state.isWriteBlockEnabled() && state.pollerState().equalsIgnoreCase("paused")); + }); + assertEquals(2, getSearchableDocCount(nodeB)); + + // remove write block + setWriteBlock(indexName, false); + waitForState(() -> { + GetIngestionStateResponse ingestionState = getIngestionState(indexName); + return ingestionState.getFailedShards() == 0 + && Arrays.stream(ingestionState.getShardStates()).allMatch(state -> state.isWriteBlockEnabled() == false); + }); + waitForSearchableDocs(4, Arrays.asList(nodeB, nodeC)); + } + private void verifyRemoteStoreEnabled(String node) { GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get(); String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled"); diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 7209a0bc9fefe..6c6105ac6fa56 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -662,6 +662,7 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul wrapper, getInstanceFromNode(CircuitBreakerService.class), env.nodeId(), + getInstanceFromNode(ClusterService.class), listener ); shardRef.set(newShard); @@ -688,6 +689,7 @@ public static final IndexShard newIndexShard( CheckedFunction wrapper, final CircuitBreakerService cbs, final String nodeId, + final ClusterService clusterService, final IndexingOperationListener... listeners ) throws IOException { ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry()); @@ -726,7 +728,8 @@ public static final IndexShard newIndexShard( false, OpenSearchTestCase::randomBoolean, () -> indexService.getIndexSettings().getRefreshInterval(), - indexService.getRefreshMutex() + indexService.getRefreshMutex(), + clusterService.getClusterApplierService() ); } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionState.java b/server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionState.java index 9ba8d57b465cb..ebaed35958f3c 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionState.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionState.java @@ -28,22 +28,21 @@ * @opensearch.experimental */ @ExperimentalApi -public record ShardIngestionState(String index, int shardId, String pollerState, String errorPolicy, boolean isPollerPaused) - implements - Writeable, - ToXContentFragment { +public record ShardIngestionState(String index, int shardId, String pollerState, String errorPolicy, boolean isPollerPaused, + boolean isWriteBlockEnabled) implements Writeable, ToXContentFragment { private static final String SHARD = "shard"; private static final String POLLER_STATE = "poller_state"; private static final String ERROR_POLICY = "error_policy"; private static final String POLLER_PAUSED = "poller_paused"; + private static final String WRITE_BLOCK_ENABLED = "write_block_enabled"; public ShardIngestionState() { - this("", -1, "", "", false); + this("", -1, "", "", false, false); } public ShardIngestionState(StreamInput in) throws IOException { - this(in.readString(), in.readVInt(), in.readOptionalString(), in.readOptionalString(), in.readBoolean()); + this(in.readString(), in.readVInt(), in.readOptionalString(), in.readOptionalString(), in.readBoolean(), in.readBoolean()); } public ShardIngestionState( @@ -51,13 +50,15 @@ public ShardIngestionState( int shardId, @Nullable String pollerState, @Nullable String errorPolicy, - boolean isPollerPaused + boolean isPollerPaused, + boolean isWriteBlockEnabled ) { this.index = index; this.shardId = shardId; this.pollerState = pollerState; this.errorPolicy = errorPolicy; this.isPollerPaused = isPollerPaused; + this.isWriteBlockEnabled = isWriteBlockEnabled; } @Override @@ -67,6 +68,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(pollerState); out.writeOptionalString(errorPolicy); out.writeBoolean(isPollerPaused); + out.writeBoolean(isWriteBlockEnabled); } @Override @@ -76,6 +78,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(POLLER_STATE, pollerState); builder.field(ERROR_POLICY, errorPolicy); builder.field(POLLER_PAUSED, isPollerPaused); + builder.field(WRITE_BLOCK_ENABLED, isWriteBlockEnabled); builder.endObject(); return builder; } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 8c6a53f9d298f..e11cc7df6c824 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -727,7 +727,8 @@ protected void closeInternal() { shardLevelRefreshEnabled, fixedRefreshIntervalSchedulingEnabled, this::getRefreshInterval, - refreshMutex + refreshMutex, + clusterService.getClusterApplierService() ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index f2facbd0dffa9..4af4ceb4094e8 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -41,6 +41,7 @@ import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.Sort; import org.apache.lucene.search.similarities.Similarity; +import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.settings.Setting; @@ -113,6 +114,7 @@ public final class EngineConfig { private final BooleanSupplier startedPrimarySupplier; private final Comparator leafSorter; private final Supplier documentMapperForTypeSupplier; + private final ClusterApplierService clusterApplierService; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -303,6 +305,7 @@ private EngineConfig(Builder builder) { this.leafSorter = builder.leafSorter; this.documentMapperForTypeSupplier = builder.documentMapperForTypeSupplier; this.indexReaderWarmer = builder.indexReaderWarmer; + this.clusterApplierService = builder.clusterApplierService; } /** @@ -576,6 +579,13 @@ public Comparator getLeafSorter() { return this.leafSorter; } + /** + * Returns the ClusterApplierService instance. + */ + public ClusterApplierService getClusterApplierService() { + return this.clusterApplierService; + } + /** * Builder for EngineConfig class * @@ -611,6 +621,7 @@ public static class Builder { private Supplier documentMapperForTypeSupplier; Comparator leafSorter; private IndexWriter.IndexReaderWarmer indexReaderWarmer; + private ClusterApplierService clusterApplierService; public Builder shardId(ShardId shardId) { this.shardId = shardId; @@ -757,6 +768,11 @@ public Builder indexReaderWarmer(IndexWriter.IndexReaderWarmer indexReaderWarmer return this; } + public Builder clusterApplierService(ClusterApplierService clusterApplierService) { + this.clusterApplierService = clusterApplierService; + return this; + } + public EngineConfig build() { return new EngineConfig(this); } diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java index 81fa90dfdd78f..2240b6e76eaac 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java @@ -18,6 +18,7 @@ import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.Sort; import org.apache.lucene.search.similarities.Similarity; +import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.common.Nullable; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.index.shard.ShardId; @@ -158,7 +159,8 @@ public EngineConfig newEngineConfig( TranslogFactory translogFactory, Comparator leafSorter, Supplier documentMapperForTypeSupplier, - IndexWriter.IndexReaderWarmer indexReaderWarmer + IndexWriter.IndexReaderWarmer indexReaderWarmer, + ClusterApplierService clusterApplierService ) { CodecService codecServiceToUse = codecService; if (codecService == null && this.codecServiceFactory != null) { @@ -194,6 +196,7 @@ public EngineConfig newEngineConfig( .leafSorter(leafSorter) .documentMapperForTypeSupplier(documentMapperForTypeSupplier) .indexReaderWarmer(indexReaderWarmer) + .clusterApplierService(clusterApplierService) .build(); } diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index 2f145b9cb2b9d..acb2e91928ce0 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -14,6 +14,7 @@ import org.apache.lucene.search.IndexSearcher; import org.opensearch.ExceptionsHelper; import org.opensearch.action.admin.indices.streamingingestion.state.ShardIngestionState; +import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IngestionSource; import org.opensearch.common.lease.Releasable; @@ -132,6 +133,19 @@ public void start() { ingestionSource.getNumProcessorThreads(), ingestionSource.getBlockingQueueSize() ); + + // Register the poller with the ClusterService for receiving cluster state updates. + // Also initialize cluster write block state in the poller. + if (engineConfig.getClusterApplierService() != null) { + engineConfig.getClusterApplierService().addListener(streamPoller); + boolean isWriteBlockEnabled = engineConfig.getClusterApplierService() + .state() + .blocks() + .indexBlocked(ClusterBlockLevel.WRITE, engineConfig.getIndexSettings().getIndex().getName()); + streamPoller.setWriteBlockEnabled(isWriteBlockEnabled); + } + + // start the polling loop streamPoller.start(); } @@ -512,7 +526,9 @@ public ShardIngestionState getIngestionState() { engineConfig.getShardId().getId(), streamPoller.getState().toString(), streamPoller.getErrorStrategy().getName(), - streamPoller.isPaused() + streamPoller.isPaused(), + streamPoller.isWriteBlockEnabled() + ); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 52eabcfb9a486..ddb6009ace9e3 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -76,6 +76,7 @@ import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.common.Booleans; import org.opensearch.common.CheckedConsumer; import org.opensearch.common.CheckedFunction; @@ -375,6 +376,7 @@ Runnable getGlobalCheckpointSyncer() { private final Supplier refreshInterval; private final Object refreshMutex; private volatile AsyncShardRefreshTask refreshTask; + private final ClusterApplierService clusterApplierService; public IndexShard( final ShardRouting shardRouting, @@ -411,7 +413,8 @@ public IndexShard( final boolean shardLevelRefreshEnabled, final Supplier fixedRefreshIntervalSchedulingEnabled, final Supplier refreshInterval, - final Object refreshMutex + final Object refreshMutex, + final ClusterApplierService clusterApplierService ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -518,6 +521,7 @@ public boolean shouldCache(Query query) { this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled; this.refreshInterval = refreshInterval; this.refreshMutex = Objects.requireNonNull(refreshMutex); + this.clusterApplierService = clusterApplierService; synchronized (this.refreshMutex) { if (shardLevelRefreshEnabled) { startRefreshTask(); @@ -4127,7 +4131,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro isTimeSeriesDescSortOptimizationEnabled() ? DataStream.TIMESERIES_LEAF_SORTER : null, // DESC @timestamp default order for // timeseries () -> docMapper(), - mergedSegmentWarmerFactory.get(this) + mergedSegmentWarmerFactory.get(this), + clusterApplierService ); } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java index 411e359911093..b0ff45fa8c729 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java @@ -10,8 +10,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.common.Nullable; import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.index.IndexSettings; import org.opensearch.index.IngestionShardConsumer; import org.opensearch.index.IngestionShardPointer; import org.opensearch.index.Message; @@ -30,6 +34,7 @@ */ public class DefaultStreamPoller implements StreamPoller { private static final Logger logger = LogManager.getLogger(DefaultStreamPoller.class); + private static final int DEFAULT_POLLER_SLEEP_PERIOD_MS = 100; private volatile State state = State.NONE; @@ -39,6 +44,9 @@ public class DefaultStreamPoller implements StreamPoller { private volatile boolean paused; private volatile IngestionErrorStrategy errorStrategy; + // indicates if a local or global cluster write block is in effect + private volatile boolean isWriteBlockEnabled; + private volatile long lastPolledMessageTimestamp = 0; private IngestionShardConsumer consumer; @@ -56,6 +64,7 @@ public class DefaultStreamPoller implements StreamPoller { private int pollTimeout; private Set persistedPointers; + private final String indexName; private final CounterMetric totalPolledCount = new CounterMetric(); private final CounterMetric totalConsumerErrorCount = new CounterMetric(); @@ -98,7 +107,8 @@ public DefaultStreamPoller( errorStrategy, initialState, maxPollSize, - pollTimeout + pollTimeout, + ingestionEngine.config().getIndexSettings() ); } @@ -112,7 +122,8 @@ public DefaultStreamPoller( IngestionErrorStrategy errorStrategy, State initialState, long maxPollSize, - int pollTimeout + int pollTimeout, + IndexSettings indexSettings ) { this.consumer = Objects.requireNonNull(consumer); this.resetState = resetState; @@ -133,6 +144,8 @@ public DefaultStreamPoller( ) ); this.errorStrategy = errorStrategy; + this.indexName = indexSettings.getIndex().getName(); + } @Override @@ -199,11 +212,10 @@ protected void startPoll() { resetState = ResetState.NONE; } - if (paused) { + if (paused || isWriteBlockEnabled) { state = State.PAUSED; try { - // TODO: make sleep time configurable - Thread.sleep(100); + Thread.sleep(DEFAULT_POLLER_SLEEP_PERIOD_MS); } catch (Throwable e) { logger.error("Error in pausing the poller of shard {}: {}", consumer.getShardId(), e); } @@ -417,4 +429,30 @@ public void updateErrorStrategy(IngestionErrorStrategy errorStrategy) { this.errorStrategy = errorStrategy; blockingQueueContainer.updateErrorStrategy(errorStrategy); } + + @Override + public boolean isWriteBlockEnabled() { + return isWriteBlockEnabled; + } + + @Override + public void setWriteBlockEnabled(boolean isWriteBlockEnabled) { + this.isWriteBlockEnabled = isWriteBlockEnabled; + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + try { + if (event.blocksChanged() == false) { + return; + } + + final ClusterState state = event.state(); + isWriteBlockEnabled = state.blocks().indexBlocked(ClusterBlockLevel.WRITE, indexName); + + } catch (Exception e) { + logger.error("Error applying cluster state in stream poller", e); + throw e; + } + } } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java b/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java index 16e7c06a49b72..aaff9f13a317d 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java @@ -8,6 +8,7 @@ package org.opensearch.indices.pollingingest; +import org.opensearch.cluster.ClusterStateListener; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.IngestionShardPointer; @@ -16,7 +17,7 @@ /** * A poller for reading messages from an ingestion shard. This is used in the ingestion engine. */ -public interface StreamPoller extends Closeable { +public interface StreamPoller extends Closeable, ClusterStateListener { String BATCH_START = "batch_start"; @@ -61,6 +62,16 @@ public interface StreamPoller extends Closeable { */ void updateErrorStrategy(IngestionErrorStrategy errorStrategy); + /** + * Returns if write block is active for the poller. + */ + boolean isWriteBlockEnabled(); + + /** + * Sets write block status for the poller. + */ + void setWriteBlockEnabled(boolean isWriteBlockEnabled); + /** * a state to indicate the current state of the poller */ diff --git a/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/GetIngestionStateResponseTests.java b/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/GetIngestionStateResponseTests.java index 1bef9ed371c18..44879fc5adda2 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/GetIngestionStateResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/GetIngestionStateResponseTests.java @@ -19,8 +19,8 @@ public class GetIngestionStateResponseTests extends OpenSearchTestCase { public void testSerialization() throws IOException { ShardIngestionState[] shardStates = new ShardIngestionState[] { - new ShardIngestionState("index1", 0, "POLLING", "DROP", false), - new ShardIngestionState("index1", 1, "PAUSED", "BLOCK", true) }; + new ShardIngestionState("index1", 0, "POLLING", "DROP", false, false), + new ShardIngestionState("index1", 1, "PAUSED", "BLOCK", true, false) }; GetIngestionStateResponse response = new GetIngestionStateResponse(shardStates, 2, 2, 0, null, Collections.emptyList()); try (BytesStreamOutput out = new BytesStreamOutput()) { diff --git a/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionStateTests.java b/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionStateTests.java index b0aff39aef8c1..a809706ae0dcd 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionStateTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionStateTests.java @@ -19,7 +19,7 @@ public class ShardIngestionStateTests extends OpenSearchTestCase { public void testSerialization() throws IOException { - ShardIngestionState state = new ShardIngestionState("index1", 0, "POLLING", "DROP", false); + ShardIngestionState state = new ShardIngestionState("index1", 0, "POLLING", "DROP", false, false); try (BytesStreamOutput out = new BytesStreamOutput()) { state.writeTo(out); @@ -30,12 +30,13 @@ public void testSerialization() throws IOException { assertEquals(state.shardId(), deserializedState.shardId()); assertEquals(state.pollerState(), deserializedState.pollerState()); assertEquals(state.isPollerPaused(), deserializedState.isPollerPaused()); + assertEquals(state.isWriteBlockEnabled(), deserializedState.isWriteBlockEnabled()); } } } public void testSerializationWithNullValues() throws IOException { - ShardIngestionState state = new ShardIngestionState("index1", 0, null, null, false); + ShardIngestionState state = new ShardIngestionState("index1", 0, null, null, false, false); try (BytesStreamOutput out = new BytesStreamOutput()) { state.writeTo(out); @@ -52,9 +53,9 @@ public void testSerializationWithNullValues() throws IOException { public void testGroupShardStateByIndex() { ShardIngestionState[] states = new ShardIngestionState[] { - new ShardIngestionState("index1", 0, "POLLING", "DROP", true), - new ShardIngestionState("index1", 1, "PAUSED", "DROP", false), - new ShardIngestionState("index2", 0, "POLLING", "DROP", true) }; + new ShardIngestionState("index1", 0, "POLLING", "DROP", true, false), + new ShardIngestionState("index1", 1, "PAUSED", "DROP", false, false), + new ShardIngestionState("index2", 0, "POLLING", "DROP", true, false) }; Map> groupedStates = ShardIngestionState.groupShardStateByIndex(states); diff --git a/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportGetIngestionStateActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportGetIngestionStateActionTests.java index ad1cf1aea6532..ddcbe12c62ec7 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportGetIngestionStateActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportGetIngestionStateActionTests.java @@ -117,7 +117,7 @@ public void testShardOperation() { ShardRouting shardRouting = mock(ShardRouting.class); IndexService indexService = mock(IndexService.class); IndexShard indexShard = mock(IndexShard.class); - ShardIngestionState expectedState = new ShardIngestionState("test-index", 0, "POLLING", "DROP", true); + ShardIngestionState expectedState = new ShardIngestionState("test-index", 0, "POLLING", "DROP", true, false); when(shardRouting.shardId()).thenReturn(mock(ShardId.class)); when(shardRouting.shardId().getIndex()).thenReturn(mock(Index.class)); @@ -166,7 +166,9 @@ public void testShardOperationWithAlreadyClosedException() { public void testNewResponse() { GetIngestionStateRequest request = new GetIngestionStateRequest(new String[] { "test-index" }); - List responses = Collections.singletonList(new ShardIngestionState("test-index", 0, "POLLING", "DROP", true)); + List responses = Collections.singletonList( + new ShardIngestionState("test-index", 0, "POLLING", "DROP", true, false) + ); List shardFailures = Collections.emptyList(); ClusterState clusterState = mock(ClusterState.class); diff --git a/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportUpdateIngestionStateActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportUpdateIngestionStateActionTests.java index 2307a71be7eda..207d421084af5 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportUpdateIngestionStateActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportUpdateIngestionStateActionTests.java @@ -112,7 +112,7 @@ public void testShardOperation() { ShardRouting shardRouting = mock(ShardRouting.class); IndexService indexService = mock(IndexService.class); IndexShard indexShard = mock(IndexShard.class); - ShardIngestionState expectedState = new ShardIngestionState("test-index", 0, "PAUSED", "DROP", true); + ShardIngestionState expectedState = new ShardIngestionState("test-index", 0, "PAUSED", "DROP", true, false); when(shardRouting.shardId()).thenReturn(mock(ShardId.class)); when(shardRouting.shardId().getIndex()).thenReturn(mock(Index.class)); @@ -161,7 +161,9 @@ public void testShardOperationWithAlreadyClosedException() { public void testNewResponse() { UpdateIngestionStateRequest request = new UpdateIngestionStateRequest(new String[] { "test-index" }, new int[] { 0 }); - List responses = Collections.singletonList(new ShardIngestionState("test-index", 0, "PAUSED", "DROP", true)); + List responses = Collections.singletonList( + new ShardIngestionState("test-index", 0, "PAUSED", "DROP", true, false) + ); List shardFailures = Collections.emptyList(); ClusterState clusterState = mock(ClusterState.class); diff --git a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java index acb55c33a3265..cf184b29a14ff 100644 --- a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java @@ -72,6 +72,7 @@ public void testCreateEngineConfigFromFactory() { new InternalTranslogFactory(), null, null, + null, null ); @@ -154,6 +155,7 @@ public void testCreateCodecServiceFromFactory() { new InternalTranslogFactory(), null, null, + null, null ); assertNotNull(config.getCodec()); diff --git a/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java b/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java index a510f92f9dd4c..248c150d341e5 100644 --- a/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java @@ -9,7 +9,10 @@ package org.opensearch.index.engine; import org.apache.lucene.index.NoMergePolicy; +import org.opensearch.action.admin.indices.streamingingestion.state.ShardIngestionState; +import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexSettings; @@ -40,7 +43,9 @@ import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class IngestionEngineTests extends EngineTestCase { @@ -50,6 +55,7 @@ public class IngestionEngineTests extends EngineTestCase { // the messages of the stream to ingest from private List messages; private EngineConfig engineConfig; + private ClusterApplierService clusterApplierService; @Override @Before @@ -62,7 +68,9 @@ public void setUp() throws Exception { messages = new ArrayList<>(); publishData("{\"_id\":\"2\",\"_source\":{\"name\":\"bob\", \"age\": 24}}"); publishData("{\"_id\":\"1\",\"_source\":{\"name\":\"alice\", \"age\": 20}}"); - ingestionEngine = buildIngestionEngine(globalCheckpoint, ingestionEngineStore, indexSettings); + clusterApplierService = mock(ClusterApplierService.class); + when(clusterApplierService.state()).thenReturn(ClusterState.EMPTY_STATE); + ingestionEngine = buildIngestionEngine(globalCheckpoint, ingestionEngineStore, indexSettings, clusterApplierService); } private void publishData(String message) { @@ -114,6 +122,13 @@ public void testCreateEngine() throws IOException { ); Assert.assertEquals(2, persistedPointers.size()); } + + // validate ingestion state on successful engine creation + ShardIngestionState ingestionState = ingestionEngine.getIngestionState(); + assertEquals("test", ingestionState.index()); + assertEquals("DROP", ingestionState.errorPolicy()); + assertFalse(ingestionState.isPollerPaused()); + assertFalse(ingestionState.isWriteBlockEnabled()); } public void testRecovery() throws IOException { @@ -126,7 +141,7 @@ public void testRecovery() throws IOException { publishData("{\"_id\":\"3\",\"_source\":{\"name\":\"john\", \"age\": 30}}"); publishData("{\"_id\":\"4\",\"_source\":{\"name\":\"jane\", \"age\": 25}}"); ingestionEngine.close(); - ingestionEngine = buildIngestionEngine(new AtomicLong(0), ingestionEngineStore, indexSettings); + ingestionEngine = buildIngestionEngine(new AtomicLong(0), ingestionEngineStore, indexSettings, clusterApplierService); waitForResults(ingestionEngine, 4); } @@ -155,7 +170,7 @@ public void testCreationFailure() throws IOException { // overwrite the config with ingestion engine settings String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"; MapperService mapperService = createMapperService(mapping); - engineConfig = config(engineConfig, () -> new DocumentMapperForType(mapperService.documentMapper(), null)); + engineConfig = config(engineConfig, () -> new DocumentMapperForType(mapperService.documentMapper(), null), clusterApplierService); try { new IngestionEngine(engineConfig, consumerFactory); fail("Expected EngineException to be thrown"); @@ -165,7 +180,12 @@ public void testCreationFailure() throws IOException { } } - private IngestionEngine buildIngestionEngine(AtomicLong globalCheckpoint, Store store, IndexSettings settings) throws IOException { + private IngestionEngine buildIngestionEngine( + AtomicLong globalCheckpoint, + Store store, + IndexSettings settings, + ClusterApplierService clusterApplierService + ) throws IOException { FakeIngestionSource.FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource.FakeIngestionConsumerFactory(messages); if (engineConfig == null) { engineConfig = config(settings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); @@ -173,7 +193,7 @@ private IngestionEngine buildIngestionEngine(AtomicLong globalCheckpoint, Store // overwrite the config with ingestion engine settings String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"; MapperService mapperService = createMapperService(mapping); - engineConfig = config(engineConfig, () -> new DocumentMapperForType(mapperService.documentMapper(), null)); + engineConfig = config(engineConfig, () -> new DocumentMapperForType(mapperService.documentMapper(), null), clusterApplierService); if (!Lucene.indexExists(store.directory())) { store.createEmpty(engineConfig.getIndexSettings().getIndexVersionCreated().luceneVersion); final String translogUuid = Translog.createEmptyTranslog( diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java index 3cb0f8a8e1cdc..503f071dd64e9 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java @@ -8,9 +8,20 @@ package org.opensearch.indices.pollingingest; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlock; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.IndexSettings; import org.opensearch.index.IngestionShardConsumer; import org.opensearch.index.IngestionShardPointer; import org.opensearch.index.engine.FakeIngestionSource; +import org.opensearch.index.engine.IngestionEngine; +import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; import org.junit.After; import org.junit.Before; @@ -18,6 +29,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -52,6 +64,8 @@ public class DefaultStreamPollerTests extends OpenSearchTestCase { private final int sleepTime = 300; private DropIngestionErrorStrategy errorStrategy; private PartitionedBlockingQueueContainer partitionedBlockingQueueContainer; + private IngestionEngine engine; + private IndexSettings indexSettings; @Before public void setUp() throws Exception { @@ -65,6 +79,8 @@ public void setUp() throws Exception { processorRunnable = new MessageProcessorRunnable(new ArrayBlockingQueue<>(5), processor, errorStrategy); persistedPointers = new HashSet<>(); partitionedBlockingQueueContainer = new PartitionedBlockingQueueContainer(processorRunnable, 0); + engine = mock(IngestionEngine.class); + indexSettings = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); poller = new DefaultStreamPoller( new FakeIngestionSource.FakeIngestionShardPointer(0), persistedPointers, @@ -75,7 +91,8 @@ public void setUp() throws Exception { errorStrategy, StreamPoller.State.NONE, 1000, - 1000 + 1000, + indexSettings ); partitionedBlockingQueueContainer.startProcessorThreads(); } @@ -136,7 +153,8 @@ public void testSkipProcessed() throws InterruptedException { errorStrategy, StreamPoller.State.NONE, 1000, - 1000 + 1000, + indexSettings ); CountDownLatch latch = new CountDownLatch(2); @@ -176,7 +194,8 @@ public void testResetStateEarliest() throws InterruptedException { errorStrategy, StreamPoller.State.NONE, 1000, - 1000 + 1000, + indexSettings ); CountDownLatch latch = new CountDownLatch(2); doAnswer(invocation -> { @@ -202,7 +221,8 @@ public void testResetStateLatest() throws InterruptedException { errorStrategy, StreamPoller.State.NONE, 1000, - 1000 + 1000, + indexSettings ); poller.start(); @@ -224,7 +244,8 @@ public void testResetStateRewindByOffset() throws InterruptedException { errorStrategy, StreamPoller.State.NONE, 1000, - 1000 + 1000, + indexSettings ); CountDownLatch latch = new CountDownLatch(1); doAnswer(invocation -> { @@ -302,7 +323,8 @@ public void testDropErrorIngestionStrategy() throws TimeoutException, Interrupte errorStrategy, StreamPoller.State.NONE, 1000, - 1000 + 1000, + indexSettings ); poller.start(); Thread.sleep(sleepTime); @@ -358,7 +380,8 @@ public void testBlockErrorIngestionStrategy() throws TimeoutException, Interrupt errorStrategy, StreamPoller.State.NONE, 1000, - 1000 + 1000, + indexSettings ); poller.start(); Thread.sleep(sleepTime); @@ -391,7 +414,8 @@ public void testProcessingErrorWithBlockErrorIngestionStrategy() throws TimeoutE mockErrorStrategy, StreamPoller.State.NONE, 1000, - 1000 + 1000, + indexSettings ); poller.start(); Thread.sleep(sleepTime); @@ -457,7 +481,8 @@ public void testPersistedBatchStartPointer() throws TimeoutException, Interrupte errorStrategy, StreamPoller.State.NONE, 1000, - 1000 + 1000, + indexSettings ); poller.start(); Thread.sleep(sleepTime); @@ -465,4 +490,38 @@ public void testPersistedBatchStartPointer() throws TimeoutException, Interrupte assertEquals(new FakeIngestionSource.FakeIngestionShardPointer(0), poller.getBatchStartPointer()); blockingQueueContainer.close(); } + + public void testClusterStateChange() { + // set write block + ClusterState state1 = ClusterState.builder(ClusterName.DEFAULT).build(); + ClusterState state2 = ClusterState.builder(ClusterName.DEFAULT) + .blocks( + ClusterBlocks.builder() + .addGlobalBlock( + new ClusterBlock(1, "description", true, true, true, RestStatus.ACCEPTED, EnumSet.allOf((ClusterBlockLevel.class))) + ) + ) + .build(); + + ClusterChangedEvent event1 = new ClusterChangedEvent("test", state2, state1); + poller.clusterChanged(event1); + assertTrue(poller.isWriteBlockEnabled()); + + // remove write block + ClusterState state3 = ClusterState.builder(ClusterName.DEFAULT).build(); + ClusterChangedEvent event2 = new ClusterChangedEvent("test", state3, state2); + poller.clusterChanged(event2); + assertFalse(poller.isWriteBlockEnabled()); + + // test no block change + ClusterChangedEvent event3 = new ClusterChangedEvent("test", state3, state3); + poller.clusterChanged(event3); + assertFalse(poller.isWriteBlockEnabled()); + } + + public void testErrorApplyingClusterChange() { + ClusterChangedEvent event = mock(ClusterChangedEvent.class); + doThrow(new RuntimeException()).when(event).blocksChanged(); + assertThrows(RuntimeException.class, () -> poller.clusterChanged(event)); + } } diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index d6cd5cfb81dc4..9d96a1fc6f8fb 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -68,6 +68,7 @@ import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.AllocationId; +import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.common.CheckedBiFunction; import org.opensearch.common.Nullable; import org.opensearch.common.Randomness; @@ -991,7 +992,11 @@ protected EngineConfig config( /** * Override config with ingestion engine configs */ - protected EngineConfig config(EngineConfig config, Supplier documentMapperForTypeSupplier) { + protected EngineConfig config( + EngineConfig config, + Supplier documentMapperForTypeSupplier, + ClusterApplierService clusterApplierService + ) { IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( "test", Settings.builder().put(config.getIndexSettings().getSettings()).build() @@ -1019,6 +1024,7 @@ protected EngineConfig config(EngineConfig config, Supplier Boolean.FALSE, indexSettings::getRefreshInterval, - new Object() + new Object(), + clusterService.getClusterApplierService() ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) {