Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Upgrade opensearch-protobufs dependency to 0.13.0 and update transport-grpc module compatibility ([#19007](https://github.com/opensearch-project/OpenSearch/issues/19007))
- Add new extensible method to DocRequest to specify type ([#19313](https://github.com/opensearch-project/OpenSearch/pull/19313))
- [Rule based auto-tagging] Add Rule based auto-tagging IT ([#18550](https://github.com/opensearch-project/OpenSearch/pull/18550))
- Add all-active ingestion as docrep equivalent in pull-based ingestion ([#19316](https://github.com/opensearch-project/OpenSearch/pull/19316))

### Changed
- Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testFileIngestion() throws Exception {
assertEquals(0, ingestionState.getFailedShards());
assertTrue(
Arrays.stream(ingestionState.getShardStates())
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"))
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"))
);
});

Expand All @@ -129,7 +129,7 @@ public void testFileIngestion() throws Exception {
Arrays.stream(ingestionState.getShardStates())
.allMatch(
state -> state.isPollerPaused() == false
&& (state.pollerState().equalsIgnoreCase("polling") || state.pollerState().equalsIgnoreCase("processing"))
&& (state.getPollerState().equalsIgnoreCase("polling") || state.getPollerState().equalsIgnoreCase("processing"))
)
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,38 @@
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.indices.pollingingest.PollingIngestStats;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.transport.client.Requests;
import org.junit.Assert;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;
import static org.awaitility.Awaitility.await;

Expand Down Expand Up @@ -236,4 +250,273 @@ public void testMultiThreadedWrites() throws Exception {
return response.getHits().getTotalHits().value() == 1000;
});
}

public void testAllActiveIngestion() throws Exception {
// Create pull-based index in default replication mode (docrep) and publish some messages

internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();
for (int i = 0; i < 10; i++) {
produceData(Integer.toString(i), "name" + i, "30");
}

createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.all_active", true)
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

ensureYellowAndNoInitializingShards(indexName);
waitForSearchableDocs(10, List.of(nodeA));
flush(indexName);

// add a second node and verify the replica ingests the data
final String nodeB = internalCluster().startDataOnlyNode();
ensureGreen(indexName);
assertTrue(nodeA.equals(primaryNodeName(indexName)));
assertTrue(nodeB.equals(replicaNodeName(indexName)));
waitForSearchableDocs(10, List.of(nodeB));

// verify pause and resume functionality on replica

// pause ingestion
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
assertTrue(pauseResponse.isAcknowledged());
assertTrue(pauseResponse.isShardsAcknowledged());
waitForState(() -> {
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
return ingestionState.getShardStates().length == 2
&& ingestionState.getFailedShards() == 0
&& Arrays.stream(ingestionState.getShardStates())
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"));
});

for (int i = 10; i < 20; i++) {
produceData(Integer.toString(i), "name" + i, "30");
}

// replica must not ingest when paused
Thread.sleep(1000);
assertEquals(10, getSearchableDocCount(nodeB));

// resume ingestion
ResumeIngestionResponse resumeResponse = resumeIngestion(indexName);
assertTrue(resumeResponse.isAcknowledged());
assertTrue(resumeResponse.isShardsAcknowledged());
waitForState(() -> {
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
return ingestionState.getShardStates().length == 2
&& Arrays.stream(ingestionState.getShardStates())
.allMatch(
state -> state.isPollerPaused() == false
&& (state.getPollerState().equalsIgnoreCase("polling") || state.getPollerState().equalsIgnoreCase("processing"))
);
});

// verify replica ingests data after resuming ingestion
waitForSearchableDocs(20, List.of(nodeA, nodeB));

// produce 10 more messages
for (int i = 20; i < 30; i++) {
produceData(Integer.toString(i), "name" + i, "30");
}

// Add new node and wait for new node to join cluster
final String nodeC = internalCluster().startDataOnlyNode();
assertBusy(() -> {
assertEquals(
"Should have 4 nodes total (1 cluster manager + 3 data)",
4,
internalCluster().clusterService().state().nodes().getSize()
);
}, 30, TimeUnit.SECONDS);

// move replica from nodeB to nodeC
ensureGreen(indexName);
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(indexName, 0, nodeB, nodeC)).get();
ensureGreen(indexName);

// confirm replica ingests messages after moving to new node
waitForSearchableDocs(30, List.of(nodeA, nodeC));

for (int i = 30; i < 40; i++) {
produceData(Integer.toString(i), "name" + i, "30");
}

// restart replica node and verify ingestion
internalCluster().restartNode(nodeC);
ensureGreen(indexName);
waitForSearchableDocs(40, List.of(nodeA, nodeC));

// Verify both primary and replica do not have failed messages
Map<String, PollingIngestStats> shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
assertNotNull(shardTypeToStats.get("primary"));
assertNotNull(shardTypeToStats.get("replica"));
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPollerMessageDroppedCount(), is(0L));
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPollerMessageFailureCount(), is(0L));
// replica consumes only 10 messages after it has been restarted
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPollerMessageDroppedCount(), is(0L));
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPollerMessageFailureCount(), is(0L));

GetIngestionStateResponse ingestionState = getIngestionState(indexName);
assertEquals(2, ingestionState.getShardStates().length);
assertEquals(0, ingestionState.getFailedShards());
}

public void testReplicaPromotionOnAllActiveIngestion() throws Exception {
// Create pull-based index in default replication mode (docrep) and publish some messages
internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();
for (int i = 0; i < 10; i++) {
produceData(Integer.toString(i), "name" + i, "30");
}

createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.all_active", true)
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

ensureYellowAndNoInitializingShards(indexName);
waitForSearchableDocs(10, List.of(nodeA));

// add second node
final String nodeB = internalCluster().startDataOnlyNode();
ensureGreen(indexName);
assertTrue(nodeA.equals(primaryNodeName(indexName)));
assertTrue(nodeB.equals(replicaNodeName(indexName)));
waitForSearchableDocs(10, List.of(nodeB));

// Validate replica promotion
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
ensureYellowAndNoInitializingShards(indexName);
assertTrue(nodeB.equals(primaryNodeName(indexName)));
for (int i = 10; i < 20; i++) {
produceData(Integer.toString(i), "name" + i, "30");
}

waitForSearchableDocs(20, List.of(nodeB));

// add third node and allocate the replica once the node joins the cluster
final String nodeC = internalCluster().startDataOnlyNode();
assertBusy(() -> { assertEquals(3, internalCluster().clusterService().state().nodes().getSize()); }, 30, TimeUnit.SECONDS);
client().admin().cluster().prepareReroute().add(new AllocateReplicaAllocationCommand(indexName, 0, nodeC)).get();
ensureGreen(indexName);
waitForSearchableDocs(20, List.of(nodeC));

}

public void testSnapshotRestoreOnAllActiveIngestion() throws Exception {
// Create pull-based index in default replication mode (docrep) and publish some messages
internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();
for (int i = 0; i < 20; i++) {
produceData(Integer.toString(i), "name" + i, "30");
}

createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.all_active", true)
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);
ensureGreen(indexName);
waitForSearchableDocs(20, List.of(nodeA, nodeB));

// Register snapshot repository
String snapshotRepositoryName = "test-snapshot-repo";
String snapshotName = "snapshot-1";
assertAcked(
client().admin()
.cluster()
.preparePutRepository(snapshotRepositoryName)
.setType("fs")
.setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", false))
);

// Take snapshot
flush(indexName);
CreateSnapshotResponse snapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepositoryName, snapshotName)
.setWaitForCompletion(true)
.setIndices(indexName)
.get();
assertTrue(snapshotResponse.getSnapshotInfo().successfulShards() > 0);

// Delete Index
assertAcked(client().admin().indices().prepareDelete(indexName));
waitForState(() -> {
ClusterState state = client().admin().cluster().prepareState().setIndices(indexName).get().getState();
return state.getRoutingTable().hasIndex(indexName) == false && state.getMetadata().hasIndex(indexName) == false;
});

for (int i = 20; i < 40; i++) {
produceData(Integer.toString(i), "name" + i, "30");
}

// Restore Index from Snapshot
client().admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepositoryName, snapshotName)
.setWaitForCompletion(true)
.setIndices(indexName)
.get();
ensureGreen(indexName);

refresh(indexName);
waitForSearchableDocs(40, List.of(nodeA, nodeB));

// Verify both primary and replica have polled only remaining 20 messages
Map<String, PollingIngestStats> shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
assertNotNull(shardTypeToStats.get("primary"));
assertNotNull(shardTypeToStats.get("replica"));
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPolledCount(), is(20L));
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPollerMessageDroppedCount(), is(0L));
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPollerMessageFailureCount(), is(0L));
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPolledCount(), is(20L));
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPollerMessageDroppedCount(), is(0L));
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPollerMessageFailureCount(), is(0L));
}

// returns PollingIngestStats for single primary and single replica
private Map<String, PollingIngestStats> getPollingIngestStatsForPrimaryAndReplica(String indexName) {
IndexStats indexStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName);
ShardStats[] shards = indexStats.getShards();
assertEquals(2, shards.length);
Map<String, PollingIngestStats> shardTypeToStats = new HashMap<>();
for (ShardStats shardStats : shards) {
if (shardStats.getShardRouting().primary()) {
shardTypeToStats.put("primary", shardStats.getPollingIngestStats());
} else {
shardTypeToStats.put("replica", shardStats.getPollingIngestStats());
}
}

return shardTypeToStats;
}
}
Loading
Loading