Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Apply cluster state metadata and routing table diff when building cluster state from remote([#18256](https://github.com/opensearch-project/OpenSearch/pull/18256))
- Support create mode in pull-based ingestion and add retries for transient failures ([#18250](https://github.com/opensearch-project/OpenSearch/pull/18250)))
- Decouple the init of Crypto Plugin and KeyProvider in CryptoRegistry ([18270](https://github.com/opensearch-project/OpenSearch/pull18270)))
- Support cluster write block in pull-based ingestion ([#18280](https://github.com/opensearch-project/OpenSearch/pull/18280)))

### Changed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,12 @@ protected void recreateKafkaTopics(int numKafkaPartitions) {
cleanup();
setupKafka(numKafkaPartitions);
}

protected void setWriteBlock(String indexName, boolean isWriteBlockEnabled) {
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put("index.blocks.write", isWriteBlockEnabled))
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,55 @@ public void testExternalVersioningWithDisabledGCDeletes() throws Exception {

}

public void testClusterWriteBlock() throws Exception {
// setup nodes and index
produceData("1", "name1", "24");
produceData("2", "name2", "20");
internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();

createIndexWithDefaultSettings(1, 1);
ensureGreen(indexName);
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB));

// create a write block
setWriteBlock(indexName, true);
waitForState(() -> {
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
return ingestionState.getFailedShards() == 0
&& Arrays.stream(ingestionState.getShardStates())
.allMatch(state -> state.isWriteBlockEnabled() && state.pollerState().equalsIgnoreCase("paused"));
});

// verify write block state in poller is persisted
produceData("3", "name3", "30");
produceData("4", "name4", "31");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
ensureYellowAndNoInitializingShards(indexName);
assertTrue(nodeB.equals(primaryNodeName(indexName)));

final String nodeC = internalCluster().startDataOnlyNode();
client().admin().cluster().prepareReroute().add(new AllocateReplicaAllocationCommand(indexName, 0, nodeC)).get();
ensureGreen(indexName);
assertTrue(nodeC.equals(replicaNodeName(indexName)));
waitForState(() -> {
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
return Arrays.stream(ingestionState.getShardStates())
.allMatch(state -> state.isWriteBlockEnabled() && state.pollerState().equalsIgnoreCase("paused"));
});
assertEquals(2, getSearchableDocCount(nodeB));

// remove write block
setWriteBlock(indexName, false);
waitForState(() -> {
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
return ingestionState.getFailedShards() == 0
&& Arrays.stream(ingestionState.getShardStates()).allMatch(state -> state.isWriteBlockEnabled() == false);
});
waitForSearchableDocs(4, Arrays.asList(nodeB, nodeC));
}

private void verifyRemoteStoreEnabled(String node) {
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul
wrapper,
getInstanceFromNode(CircuitBreakerService.class),
env.nodeId(),
getInstanceFromNode(ClusterService.class),
listener
);
shardRef.set(newShard);
Expand All @@ -688,6 +689,7 @@ public static final IndexShard newIndexShard(
CheckedFunction<DirectoryReader, DirectoryReader, IOException> wrapper,
final CircuitBreakerService cbs,
final String nodeId,
final ClusterService clusterService,
final IndexingOperationListener... listeners
) throws IOException {
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
Expand Down Expand Up @@ -726,7 +728,8 @@ public static final IndexShard newIndexShard(
false,
OpenSearchTestCase::randomBoolean,
() -> indexService.getIndexSettings().getRefreshInterval(),
indexService.getRefreshMutex()
indexService.getRefreshMutex(),
clusterService.getClusterApplierService()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,37 @@
* @opensearch.experimental
*/
@ExperimentalApi
public record ShardIngestionState(String index, int shardId, String pollerState, String errorPolicy, boolean isPollerPaused)
implements
Writeable,
ToXContentFragment {
public record ShardIngestionState(String index, int shardId, String pollerState, String errorPolicy, boolean isPollerPaused,
boolean isWriteBlockEnabled) implements Writeable, ToXContentFragment {

private static final String SHARD = "shard";
private static final String POLLER_STATE = "poller_state";
private static final String ERROR_POLICY = "error_policy";
private static final String POLLER_PAUSED = "poller_paused";
private static final String WRITE_BLOCK_ENABLED = "write_block_enabled";

public ShardIngestionState() {
this("", -1, "", "", false);
this("", -1, "", "", false, false);

Check warning on line 41 in server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionState.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionState.java#L41

Added line #L41 was not covered by tests
}

public ShardIngestionState(StreamInput in) throws IOException {
this(in.readString(), in.readVInt(), in.readOptionalString(), in.readOptionalString(), in.readBoolean());
this(in.readString(), in.readVInt(), in.readOptionalString(), in.readOptionalString(), in.readBoolean(), in.readBoolean());
}

public ShardIngestionState(
String index,
int shardId,
@Nullable String pollerState,
@Nullable String errorPolicy,
boolean isPollerPaused
boolean isPollerPaused,
boolean isWriteBlockEnabled
) {
this.index = index;
this.shardId = shardId;
this.pollerState = pollerState;
this.errorPolicy = errorPolicy;
this.isPollerPaused = isPollerPaused;
this.isWriteBlockEnabled = isWriteBlockEnabled;
}

@Override
Expand All @@ -67,6 +68,7 @@
out.writeOptionalString(pollerState);
out.writeOptionalString(errorPolicy);
out.writeBoolean(isPollerPaused);
out.writeBoolean(isWriteBlockEnabled);
}

@Override
Expand All @@ -76,6 +78,7 @@
builder.field(POLLER_STATE, pollerState);
builder.field(ERROR_POLICY, errorPolicy);
builder.field(POLLER_PAUSED, isPollerPaused);
builder.field(WRITE_BLOCK_ENABLED, isWriteBlockEnabled);

Check warning on line 81 in server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionState.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionState.java#L81

Added line #L81 was not covered by tests
builder.endObject();
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,8 @@ protected void closeInternal() {
shardLevelRefreshEnabled,
fixedRefreshIntervalSchedulingEnabled,
this::getRefreshInterval,
refreshMutex
refreshMutex,
clusterService.getClusterApplierService()
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.similarities.Similarity;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -113,6 +114,7 @@ public final class EngineConfig {
private final BooleanSupplier startedPrimarySupplier;
private final Comparator<LeafReader> leafSorter;
private final Supplier<DocumentMapperForType> documentMapperForTypeSupplier;
private final ClusterApplierService clusterApplierService;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -303,6 +305,7 @@ private EngineConfig(Builder builder) {
this.leafSorter = builder.leafSorter;
this.documentMapperForTypeSupplier = builder.documentMapperForTypeSupplier;
this.indexReaderWarmer = builder.indexReaderWarmer;
this.clusterApplierService = builder.clusterApplierService;
}

/**
Expand Down Expand Up @@ -576,6 +579,13 @@ public Comparator<LeafReader> getLeafSorter() {
return this.leafSorter;
}

/**
* Returns the ClusterApplierService instance.
*/
public ClusterApplierService getClusterApplierService() {
return this.clusterApplierService;
}

/**
* Builder for EngineConfig class
*
Expand Down Expand Up @@ -611,6 +621,7 @@ public static class Builder {
private Supplier<DocumentMapperForType> documentMapperForTypeSupplier;
Comparator<LeafReader> leafSorter;
private IndexWriter.IndexReaderWarmer indexReaderWarmer;
private ClusterApplierService clusterApplierService;

public Builder shardId(ShardId shardId) {
this.shardId = shardId;
Expand Down Expand Up @@ -757,6 +768,11 @@ public Builder indexReaderWarmer(IndexWriter.IndexReaderWarmer indexReaderWarmer
return this;
}

public Builder clusterApplierService(ClusterApplierService clusterApplierService) {
this.clusterApplierService = clusterApplierService;
return this;
}

public EngineConfig build() {
return new EngineConfig(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.similarities.Similarity;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.common.Nullable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.index.shard.ShardId;
Expand Down Expand Up @@ -158,7 +159,8 @@ public EngineConfig newEngineConfig(
TranslogFactory translogFactory,
Comparator<LeafReader> leafSorter,
Supplier<DocumentMapperForType> documentMapperForTypeSupplier,
IndexWriter.IndexReaderWarmer indexReaderWarmer
IndexWriter.IndexReaderWarmer indexReaderWarmer,
ClusterApplierService clusterApplierService
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
Expand Down Expand Up @@ -194,6 +196,7 @@ public EngineConfig newEngineConfig(
.leafSorter(leafSorter)
.documentMapperForTypeSupplier(documentMapperForTypeSupplier)
.indexReaderWarmer(indexReaderWarmer)
.clusterApplierService(clusterApplierService)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.search.IndexSearcher;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.admin.indices.streamingingestion.state.ShardIngestionState;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IngestionSource;
import org.opensearch.common.lease.Releasable;
Expand Down Expand Up @@ -132,6 +133,19 @@ public void start() {
ingestionSource.getNumProcessorThreads(),
ingestionSource.getBlockingQueueSize()
);

// Register the poller with the ClusterService for receiving cluster state updates.
// Also initialize cluster write block state in the poller.
if (engineConfig.getClusterApplierService() != null) {
engineConfig.getClusterApplierService().addListener(streamPoller);
boolean isWriteBlockEnabled = engineConfig.getClusterApplierService()
.state()
.blocks()
.indexBlocked(ClusterBlockLevel.WRITE, engineConfig.getIndexSettings().getIndex().getName());
streamPoller.setWriteBlockEnabled(isWriteBlockEnabled);
}

// start the polling loop
streamPoller.start();
}

Expand Down Expand Up @@ -512,7 +526,9 @@ public ShardIngestionState getIngestionState() {
engineConfig.getShardId().getId(),
streamPoller.getState().toString(),
streamPoller.getErrorStrategy().getName(),
streamPoller.isPaused()
streamPoller.isPaused(),
streamPoller.isWriteBlockEnabled()

);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.common.Booleans;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.CheckedFunction;
Expand Down Expand Up @@ -375,6 +376,7 @@ Runnable getGlobalCheckpointSyncer() {
private final Supplier<TimeValue> refreshInterval;
private final Object refreshMutex;
private volatile AsyncShardRefreshTask refreshTask;
private final ClusterApplierService clusterApplierService;

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -411,7 +413,8 @@ public IndexShard(
final boolean shardLevelRefreshEnabled,
final Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
final Supplier<TimeValue> refreshInterval,
final Object refreshMutex
final Object refreshMutex,
final ClusterApplierService clusterApplierService
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -518,6 +521,7 @@ public boolean shouldCache(Query query) {
this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled;
this.refreshInterval = refreshInterval;
this.refreshMutex = Objects.requireNonNull(refreshMutex);
this.clusterApplierService = clusterApplierService;
synchronized (this.refreshMutex) {
if (shardLevelRefreshEnabled) {
startRefreshTask();
Expand Down Expand Up @@ -4127,7 +4131,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
isTimeSeriesDescSortOptimizationEnabled() ? DataStream.TIMESERIES_LEAF_SORTER : null, // DESC @timestamp default order for
// timeseries
() -> docMapper(),
mergedSegmentWarmerFactory.get(this)
mergedSegmentWarmerFactory.get(this),
clusterApplierService
);
}

Expand Down
Loading
Loading