Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,24 @@
import org.junit.ClassRule;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Locale;
import java.util.Map;

public class ShardChangesRestIT extends ESRestTestCase {
private static final String CCR_SHARD_CHANGES_ENDPOINT = "/%s/ccr/shard_changes";
private static final String BULK_INDEX_ENDPOINT = "/%s/_bulk";
private static final String DATA_STREAM_ENDPOINT = "/_data_stream/%s";
private static final String INDEX_TEMPLATE_ENDPOINT = "/_index_template/%s";

private static final String[] SHARD_RESPONSE_FIELDS = new String[] {
"took_in_millis",
"operations",
"shard_id",
"index_abstraction",
"index",
"settings_version",
"max_seq_no_of_updates_or_deletes",
Expand All @@ -46,6 +52,11 @@ public class ShardChangesRestIT extends ESRestTestCase {
"aliases_version",
"max_seq_no",
"global_checkpoint" };

private static final String BULK_INDEX_TEMPLATE = """
{ "index": { "op_type": "create" } }
{ "@timestamp": "%s", "name": "%s" }
""";;
private static final String[] NAMES = { "skywalker", "leia", "obi-wan", "yoda", "chewbacca", "r2-d2", "c-3po", "darth-vader" };
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
Expand Down Expand Up @@ -99,13 +110,86 @@ public void testShardChangesDefaultParams() throws IOException {
createIndex(indexName, settings, mappings);
assertTrue(indexExists(indexName));

assertOK(client().performRequest(bulkRequest(indexName, randomIntBetween(10, 20))));
assertOK(bulkIndex(indexName, randomIntBetween(10, 20)));

final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName));
final Response response = client().performRequest(shardChangesRequest);
assertOK(response);
assertShardChangesResponse(
XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false)
XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false),
indexName
);
}

public void testDataStreamShardChangesDefaultParams() throws IOException {
final String templateName = randomAlphanumericOfLength(8).toLowerCase(Locale.ROOT);
assertOK(createIndexTemplate(templateName, """
{
"index_patterns": [ "test-*-*" ],
"data_stream": {},
"priority": 100,
"template": {
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"name": {
"type": "keyword"
}
}
}
}
}"""));

final String dataStreamName = "test-"
+ randomAlphanumericOfLength(5).toLowerCase(Locale.ROOT)
+ "-"
+ randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
assertOK(createDataStream(dataStreamName));

assertOK(bulkIndex(dataStreamName, randomIntBetween(10, 20)));

final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(dataStreamName));
final Response response = client().performRequest(shardChangesRequest);
assertOK(response);
assertShardChangesResponse(
XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false),
dataStreamName
);
}

public void testIndexAliasShardChangesDefaultParams() throws IOException {
final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT);
final String aliasName = randomAlphanumericOfLength(8).toLowerCase(Locale.ROOT);
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "1s")
.build();
final String mappings = """
{
"properties": {
"name": {
"type": "keyword"
}
}
}
""";
createIndex(indexName, settings, mappings);
assertTrue(indexExists(indexName));

final Request putAliasRequest = new Request("PUT", "/" + indexName + "/_alias/" + aliasName);
assertOK(client().performRequest(putAliasRequest));

assertOK(bulkIndex(aliasName, randomIntBetween(10, 20)));

final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(aliasName));
final Response response = client().performRequest(shardChangesRequest);
assertOK(response);
assertShardChangesResponse(
XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false),
aliasName
);
}

Expand All @@ -121,7 +205,7 @@ public void testShardChangesWithAllParameters() throws IOException {
);
assertTrue(indexExists(indexName));

assertOK(client().performRequest(bulkRequest(indexName, randomIntBetween(100, 200))));
assertOK(bulkIndex(indexName, randomIntBetween(100, 200)));

final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName));
shardChangesRequest.addParameter("from_seq_no", "0");
Expand All @@ -132,7 +216,8 @@ public void testShardChangesWithAllParameters() throws IOException {
final Response response = client().performRequest(shardChangesRequest);
assertOK(response);
assertShardChangesResponse(
XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false)
XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false),
indexName
);
}

Expand All @@ -148,7 +233,7 @@ public void testShardChangesMultipleRequests() throws IOException {
);
assertTrue(indexExists(indexName));

assertOK(client().performRequest(bulkRequest(indexName, randomIntBetween(100, 200))));
assertOK(bulkIndex(indexName, randomIntBetween(100, 200)));

final Request firstRequest = new Request("GET", shardChangesEndpoint(indexName));
firstRequest.addParameter("from_seq_no", "0");
Expand All @@ -159,7 +244,8 @@ public void testShardChangesMultipleRequests() throws IOException {
final Response firstResponse = client().performRequest(firstRequest);
assertOK(firstResponse);
assertShardChangesResponse(
XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(firstResponse.getEntity()), false)
XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(firstResponse.getEntity()), false),
indexName
);

final Request secondRequest = new Request("GET", shardChangesEndpoint(indexName));
Expand All @@ -171,7 +257,8 @@ public void testShardChangesMultipleRequests() throws IOException {
final Response secondResponse = client().performRequest(secondRequest);
assertOK(secondResponse);
assertShardChangesResponse(
XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(secondResponse.getEntity()), false)
XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(secondResponse.getEntity()), false),
indexName
);
}

Expand Down Expand Up @@ -231,17 +318,36 @@ public void testShardChangesMissingIndex() throws IOException {
assertResponseException(ex, RestStatus.BAD_REQUEST, "Failed to process shard changes for index [" + indexName + "]");
}

private static Request bulkRequest(final String indexName, int numberOfDocuments) {
private static Response bulkIndex(final String indexName, int numberOfDocuments) throws IOException {
final StringBuilder sb = new StringBuilder();

long timestamp = System.currentTimeMillis();
for (int i = 0; i < numberOfDocuments; i++) {
sb.append(String.format(Locale.ROOT, "{ \"index\": { \"_id\": \"%d\" } }\n{ \"name\": \"%s\" }\n", i + 1, randomFrom(NAMES)));
sb.append(
String.format(
Locale.ROOT,
BULK_INDEX_TEMPLATE,
Instant.ofEpochMilli(timestamp).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME),
randomFrom(NAMES)
)
);
timestamp += 1000; // 1 second
}

final Request request = new Request("POST", bulkEndpoint(indexName));
request.setJsonEntity(sb.toString());
request.addParameter("refresh", "true");
return request;
return client().performRequest(request);
}

private Response createDataStream(final String dataStreamName) throws IOException {
return client().performRequest(new Request("PUT", dataStreamEndpoint(dataStreamName)));
}

private static Response createIndexTemplate(final String templateName, final String mappings) throws IOException {
final Request request = new Request("PUT", indexTemplateEndpoint(templateName));
request.setJsonEntity(mappings);
return client().performRequest(request);
}

private static String shardChangesEndpoint(final String indexName) {
Expand All @@ -252,16 +358,28 @@ private static String bulkEndpoint(final String indexName) {
return String.format(Locale.ROOT, BULK_INDEX_ENDPOINT, indexName);
}

private static String dataStreamEndpoint(final String dataStreamName) {
return String.format(Locale.ROOT, DATA_STREAM_ENDPOINT, dataStreamName);
}

private static String indexTemplateEndpoint(final String templateName) {
return String.format(Locale.ROOT, INDEX_TEMPLATE_ENDPOINT, templateName);
}

private void assertResponseException(final ResponseException ex, final RestStatus restStatus, final String error) {
assertEquals(restStatus.getStatus(), ex.getResponse().getStatusLine().getStatusCode());
assertThat(ex.getMessage(), Matchers.containsString(error));
}

private void assertShardChangesResponse(final Map<String, Object> shardChangesResponseBody) {
private void assertShardChangesResponse(final Map<String, Object> shardChangesResponseBody, final String indexAbstractionName) {
for (final String fieldName : SHARD_RESPONSE_FIELDS) {
final Object fieldValue = shardChangesResponseBody.get(fieldName);
assertNotNull("Field " + fieldName + " is missing or has a null value.", fieldValue);

if ("index_abstraction".equals(fieldName)) {
assertEquals(indexAbstractionName, fieldValue);
}

if ("operations".equals(fieldName)) {
if (fieldValue instanceof List<?> operationsList) {
assertFalse("Field 'operations' is empty.", operationsList.isEmpty());
Expand Down
Loading