Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -64,32 +66,28 @@ public void testFollowIndex() throws Exception {
refresh(allowedIndex);
verifyDocuments(allowedIndex, numDocs, "*:*");
} else {
followIndex(client(), "leader_cluster", allowedIndex, allowedIndex);
followIndex("leader_cluster", allowedIndex, allowedIndex);
assertBusy(() -> verifyDocuments(allowedIndex, numDocs, "*:*"));
assertThat(countCcrNodeTasks(), equalTo(1));
assertThat(getCcrNodeTasks(), contains(new CcrNodeTask("leader_cluster", allowedIndex, allowedIndex, 0)));
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex), 30, TimeUnit.SECONDS);
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
pauseFollow(allowedIndex);
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
assertNoPersistentTasks();
assertThat(getCcrNodeTasks(), empty());
});

resumeFollow(allowedIndex);
assertThat(countCcrNodeTasks(), equalTo(1));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
assertThat(getCcrNodeTasks(), contains(new CcrNodeTask("leader_cluster", allowedIndex, allowedIndex, 0)));
pauseFollow(allowedIndex);
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
assertNoPersistentTasks();
assertThat(getCcrNodeTasks(), empty());
});

closeIndex(allowedIndex);
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
unfollow(allowedIndex);
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(allowedIndex));
assertThat(e.getMessage(), containsString("follow index [" + allowedIndex + "] does not have ccr metadata"));

Expand All @@ -98,7 +96,7 @@ public void testFollowIndex() throws Exception {
assertThat(e.getMessage(), containsString("action [indices:admin/xpack/ccr/put_follow] is unauthorized for user [test_ccr]"));
// Verify that the follow index has not been created and no node tasks are running
assertThat(indexExists(unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
assertBusy(() -> assertThat(getCcrNodeTasks(), empty()));

// User does have manage_follow_index index privilege on 'allowed' index,
// but not read / monitor roles on 'disallowed' index:
Expand All @@ -113,7 +111,7 @@ public void testFollowIndex() throws Exception {
);
// Verify that the follow index has not been created and no node tasks are running
assertThat(indexExists(unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
assertBusy(() -> assertThat(getCcrNodeTasks(), empty()));

followIndex(adminClient(), "leader_cluster", unallowedIndex, unallowedIndex);
pauseFollow(adminClient(), unallowedIndex);
Expand All @@ -127,7 +125,7 @@ public void testFollowIndex() throws Exception {
+ "privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing"
)
);
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
assertBusy(() -> assertThat(getCcrNodeTasks(), empty()));

e = expectThrows(
ResponseException.class,
Expand All @@ -138,12 +136,13 @@ public void testFollowIndex() throws Exception {
closeIndexRequest.addParameter("wait_for_active_shards", "0");
assertOK(adminClient().performRequest(closeIndexRequest));
assertOK(adminClient().performRequest(new Request("POST", "/" + unallowedIndex + "/_ccr/unfollow")));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
assertBusy(() -> assertThat(getCcrNodeTasks(), empty()));
}
}

public void testAutoFollowPatterns() throws Exception {
assumeFalse("Test should only run when both clusters are running", "leader".equals(targetCluster));
assumeTrue("Test should only run with target_cluster=follow", "follow".equals(targetCluster));

String allowedIndex = "logs-eu_20190101";
String disallowedIndex = "logs-us_20190101";

Expand Down Expand Up @@ -172,20 +171,21 @@ public void testAutoFollowPatterns() throws Exception {
}
}

assertBusy(() -> {
ensureYellow(allowedIndex);
verifyDocuments(allowedIndex, 5, "*:*");
}, 30, TimeUnit.SECONDS);
assertThat(indexExists(disallowedIndex), is(false));
assertBusy(() -> {
verifyCcrMonitoring(allowedIndex, allowedIndex);
verifyAutoFollowMonitoring();
}, 30, TimeUnit.SECONDS);

// Cleanup by deleting auto follow pattern and pause following:
request = new Request("DELETE", "/_ccr/auto_follow/test_pattern");
assertOK(client().performRequest(request));
pauseFollow(client(), allowedIndex);
try {
assertBusy(() -> ensureYellow(allowedIndex), 30, TimeUnit.SECONDS);
assertBusy(() -> verifyDocuments(allowedIndex, 5, "*:*"), 30, TimeUnit.SECONDS);
assertThat(indexExists(disallowedIndex), is(false));
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex), 30, TimeUnit.SECONDS);
assertBusy(ESCCRRestTestCase::verifyAutoFollowMonitoring, 30, TimeUnit.SECONDS);
} finally {
// Cleanup by deleting auto follow pattern and pause following:
try {
deleteAutoFollowPattern("test_pattern");
pauseFollow(allowedIndex);
} catch (Throwable e) {
logger.warn("Failed to cleanup after the test", e);
}
}
}

public void testForgetFollower() throws IOException {
Expand All @@ -202,7 +202,7 @@ public void testForgetFollower() throws IOException {
final Response response = client().performRequest(new Request("GET", "/" + forgetFollower + "/_stats"));
final String followerIndexUUID = ObjectPath.createFromResponse(response).evaluate("indices." + forgetFollower + ".uuid");

assertOK(client().performRequest(new Request("POST", "/" + forgetFollower + "/_ccr/pause_follow")));
pauseFollow(forgetFollower);

try (RestClient leaderClient = buildLeaderClient(restAdminSettings())) {
final Request request = new Request("POST", "/" + forgetLeader + "/_ccr/forget_follower");
Expand Down Expand Up @@ -255,24 +255,17 @@ public void testCleanShardFollowTaskAfterDeleteFollower() throws Exception {
} else {
logger.info("running against follower cluster");
followIndex(client(), "leader_cluster", cleanLeader, cleanFollower);

final Request request = new Request("DELETE", "/" + cleanFollower);
final Response response = client().performRequest(request);
assertOK(response);
deleteIndex(client(), cleanFollower);
// the shard follow task should have been cleaned up on behalf of the user, see ShardFollowTaskCleaner
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
assertNoPersistentTasks();
assertThat(getCcrNodeTasks(), empty());
});
}
}

public void testUnPromoteAndFollowDataStream() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}
assumeTrue("Test should only run with target_cluster=follow", "follow".equals(targetCluster));

int numDocs = 64;
String dataStreamName = "logs-eu-monitor1";
Expand All @@ -282,7 +275,7 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
{
createAutoFollowPattern(adminClient(), "test_pattern", "logs-eu*", "leader_cluster");
}
// Create data stream and ensure that is is auto followed
// Create data stream and ensure that it is auto followed
{
try (RestClient leaderClient = buildLeaderClient()) {
for (int i = 0; i < numDocs; i++) {
Expand All @@ -302,11 +295,9 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
}
// promote and unfollow
{
Request promoteRequest = new Request("POST", "/_data_stream/_promote/" + dataStreamName);
assertOK(client().performRequest(promoteRequest));
assertOK(client().performRequest(new Request("POST", "/_data_stream/_promote/" + dataStreamName)));
// Now that the data stream is a non replicated data stream, rollover.
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(client().performRequest(rolloverRequest));
assertOK(client().performRequest(new Request("POST", "/" + dataStreamName + "/_rollover")));
// Unfollow .ds-logs-eu-monitor1-000001,
// which is now possible because this index can now be closed as it is no longer the write index.
pauseFollow(backingIndexName(dataStreamName, 1));
Expand All @@ -315,4 +306,9 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
}
}

private static void assertNoPersistentTasks() throws IOException {
Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks, empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -266,23 +269,78 @@ protected static void ensureYellow(final String index, final RestClient client)
});
}

protected int countCcrNodeTasks() throws IOException {
protected Set<CcrNodeTask> getCcrNodeTasks() throws IOException {
final Request request = new Request("GET", "/_tasks");
request.addParameter("detailed", "true");
Map<String, Object> rsp1 = toMap(adminClient().performRequest(request));
Map<?, ?> nodes = (Map<?, ?>) rsp1.get("nodes");
assertThat(nodes.size(), equalTo(1));
Map<?, ?> node = (Map<?, ?>) nodes.values().iterator().next();
Map<?, ?> nodeTasks = (Map<?, ?>) node.get("tasks");
int numNodeTasks = 0;
HashSet<CcrNodeTask> ccrNodeTasks = new HashSet<CcrNodeTask>();
for (Map.Entry<?, ?> entry : nodeTasks.entrySet()) {
Map<?, ?> nodeTask = (Map<?, ?>) entry.getValue();
String action = (String) nodeTask.get("action");
if (action.startsWith("xpack/ccr/shard_follow_task")) {
numNodeTasks++;
Map<?, ?> status = (Map<?, ?>) nodeTask.get("status");
ccrNodeTasks.add(
new CcrNodeTask(
(String) status.get("remote_cluster"),
(String) status.get("leader_index"),
(String) status.get("follower_index"),
(Integer) status.get("shard_id")
)
);
}
}
return numNodeTasks;
return ccrNodeTasks;
}

protected class CcrNodeTask {
private final String remoteCluster;
private final String leaderIndex;
private final String followerIndex;
private final int shardId;

public CcrNodeTask(String remoteCluster, String leaderIndex, String followerIndex, int shardId) {
this.remoteCluster = remoteCluster;
this.leaderIndex = leaderIndex;
this.followerIndex = followerIndex;
this.shardId = shardId;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CcrNodeTask that = (CcrNodeTask) o;
return Objects.equals(remoteCluster, that.remoteCluster)
&& Objects.equals(leaderIndex, that.leaderIndex)
&& Objects.equals(followerIndex, that.followerIndex)
&& shardId == that.shardId;
}

@Override
public int hashCode() {
return Objects.hash(remoteCluster, leaderIndex, followerIndex, shardId);
}

@Override
public String toString() {
return "CcrNodeTask{remoteCluster='"
+ remoteCluster
+ "', leaderIndex='"
+ leaderIndex
+ "', followerIndex='"
+ followerIndex
+ "', shardId="
+ shardId
+ '}';
}
}

protected static void createIndex(String name, Settings settings) throws IOException {
Expand Down