Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
91819a0
Add a setting to control dangling index allocation
pugnascotia Oct 28, 2019
fedf771
Merge remote-tracking branch 'upstream/master' into allocate-dangling…
pugnascotia Nov 10, 2019
982be6f
Merge remote-tracking branch 'upstream/master' into allocate-dangling…
pugnascotia Nov 11, 2019
865831e
Fix allocate setting and implement ITs
pugnascotia Nov 14, 2019
32d3646
Merge remote-tracking branch 'upstream/master' into allocate-dangling…
pugnascotia Nov 15, 2019
e377772
Finish fixing unit tests
pugnascotia Nov 15, 2019
6393357
Rename the new setting
pugnascotia Nov 15, 2019
7950e86
WIP - trying to make new setting static
pugnascotia Nov 15, 2019
ce3b5f4
Add docs for gateway.auto_import_dangling_indices
pugnascotia Nov 15, 2019
77b4f09
Fix ITs
pugnascotia Nov 15, 2019
3a453e9
Add missing license
pugnascotia Nov 15, 2019
54db504
Fix test
pugnascotia Nov 19, 2019
44834b8
Merge remote-tracking branch 'upstream/master' into allocate-dangling…
pugnascotia Nov 19, 2019
6bf3e9b
Merge remote-tracking branch 'upstream/master' into allocate-dangling…
pugnascotia Nov 21, 2019
71bcbde
Address review feedback
pugnascotia Nov 21, 2019
d156385
Address review feedback
pugnascotia Nov 27, 2019
f4f3340
Address review comments
pugnascotia Nov 27, 2019
7cd372a
Merge remote-tracking branch 'upstream/master' into allocate-dangling…
pugnascotia Nov 27, 2019
a9dfc48
Checkstyle
pugnascotia Nov 27, 2019
85f7f57
Address review comments
pugnascotia Nov 28, 2019
2a40971
Address review feedback
pugnascotia Nov 29, 2019
77a7649
Merge remote-tracking branch 'upstream/master' into allocate-dangling…
pugnascotia Nov 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.elasticsearch.discovery.SettingsBasedSeedHostsProvider;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.DanglingIndicesState;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.IncrementalClusterStateWriter;
import org.elasticsearch.http.HttpTransportSettings;
Expand Down Expand Up @@ -184,6 +185,7 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.THRESHOLD_SETTING,
ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING,
ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
Expand All @@ -55,19 +56,35 @@ public class DanglingIndicesState implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(DanglingIndicesState.class);

public static final Setting<Boolean> AUTO_IMPORT_DANGLING_INDICES_SETTING = Setting.boolSetting(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's deprecate this setting right away, so that we can remove it in 8.0 if we want

"gateway.auto_import_dangling_indices",
true,
Setting.Property.NodeScope
);

private final NodeEnvironment nodeEnv;
private final MetaStateService metaStateService;
private final LocalAllocateDangledIndices allocateDangledIndices;

private final Map<Index, IndexMetaData> danglingIndices = ConcurrentCollections.newConcurrentMap();

private boolean allocateDanglingIndices;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this field is superfluous now?


@Inject
public DanglingIndicesState(NodeEnvironment nodeEnv, MetaStateService metaStateService,
LocalAllocateDangledIndices allocateDangledIndices, ClusterService clusterService) {
this.nodeEnv = nodeEnv;
this.metaStateService = metaStateService;
this.allocateDangledIndices = allocateDangledIndices;
clusterService.addListener(this);
this.allocateDanglingIndices = AUTO_IMPORT_DANGLING_INDICES_SETTING.get(clusterService.getSettings());

if (this.allocateDanglingIndices) {
clusterService.addListener(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this change, we are now not scanning for new dangling indices anymore and therefore also not warning about the existence of dangling indices on disk. We should keep this in mind when working out a new story for managing dangling indices via an API. Perhaps we should think about still emitting a warning in the logs. Let's add a TODO for this on the meta issue.

}
}

public void setAllocateDanglingIndicesSetting(boolean allocateDanglingIndices) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is superfluous now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, yes, good catch.

this.allocateDanglingIndices = allocateDanglingIndices;
}

/**
Expand Down Expand Up @@ -171,10 +188,15 @@ private IndexMetaData stripAliases(IndexMetaData indexMetaData) {
* Allocates the provided list of the dangled indices by sending them to the master node
* for allocation.
*/
private void allocateDanglingIndices() {
void allocateDanglingIndices() {
if (this.allocateDanglingIndices == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this condition is superfluous now?

return;
}

if (danglingIndices.isEmpty()) {
return;
}

try {
allocateDangledIndices.allocateDangled(Collections.unmodifiableCollection(new ArrayList<>(danglingIndices.values())),
new ActionListener<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
Expand All @@ -35,16 +36,21 @@
import java.nio.file.StandardCopyOption;
import java.util.Map;

import static org.elasticsearch.gateway.DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class DanglingIndicesStateTests extends ESTestCase {

private static Settings indexSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.build();
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid reformatting code unrelated to the PR.

.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.build();

public void testCleanupWhenEmpty() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
Expand All @@ -57,11 +63,11 @@ public void testCleanupWhenEmpty() throws Exception {
assertTrue(danglingState.getDanglingIndices().isEmpty());
}
}

public void testDanglingIndicesDiscovery() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
MetaStateService metaStateService = new MetaStateService(env, xContentRegistry());
DanglingIndicesState danglingState = createDanglingIndicesState(env, metaStateService);

assertTrue(danglingState.getDanglingIndices().isEmpty());
MetaData metaData = MetaData.builder().build();
final Settings.Builder settings = Settings.builder().put(indexSettings).put(IndexMetaData.SETTING_INDEX_UUID, "test1UUID");
Expand Down Expand Up @@ -155,7 +161,6 @@ public void testDanglingIndicesNotImportedWhenTombstonePresent() throws Exceptio
final IndexGraveyard graveyard = IndexGraveyard.builder().addTombstone(dangledIndex.getIndex()).build();
final MetaData metaData = MetaData.builder().indexGraveyard(graveyard).build();
assertThat(danglingState.findNewDanglingIndices(metaData).size(), equalTo(0));

}
}

Expand All @@ -181,7 +186,68 @@ public void testDanglingIndicesStripAliases() throws Exception {
}
}

public void testDanglingIndicesAreNotAllocatedWhenDisabled() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
MetaStateService metaStateService = new MetaStateService(env, xContentRegistry());
LocalAllocateDangledIndices localAllocateDangledIndices = mock(LocalAllocateDangledIndices.class);
DanglingIndicesState danglingState = createDanglingIndicesState(env, metaStateService, localAllocateDangledIndices, false);

assertTrue(danglingState.getDanglingIndices().isEmpty());

MetaData metaData = MetaData.builder().build();

final Settings.Builder settings = Settings.builder().put(indexSettings).put(IndexMetaData.SETTING_INDEX_UUID, "test1UUID");
IndexMetaData dangledIndex = IndexMetaData.builder("test1").settings(settings).build();
metaStateService.writeIndex("test_write", dangledIndex);

danglingState.findNewAndAddDanglingIndices(metaData);

danglingState.allocateDanglingIndices();

verify(localAllocateDangledIndices, never()).allocateDangled(any(), any());
}
}

public void testDanglingIndicesAreAllocatedWhenEnabled() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
MetaStateService metaStateService = new MetaStateService(env, xContentRegistry());
LocalAllocateDangledIndices localAllocateDangledIndices = mock(LocalAllocateDangledIndices.class);
DanglingIndicesState danglingState = createDanglingIndicesState(env, metaStateService, localAllocateDangledIndices, true);

assertTrue(danglingState.getDanglingIndices().isEmpty());

MetaData metaData = MetaData.builder().build();

final Settings.Builder settings = Settings.builder().put(indexSettings).put(IndexMetaData.SETTING_INDEX_UUID, "test1UUID");
IndexMetaData dangledIndex = IndexMetaData.builder("test1").settings(settings).build();
metaStateService.writeIndex("test_write", dangledIndex);

danglingState.findNewAndAddDanglingIndices(metaData);

danglingState.allocateDanglingIndices();

verify(localAllocateDangledIndices).allocateDangled(any(), any());
}
}

private DanglingIndicesState createDanglingIndicesState(NodeEnvironment env, MetaStateService metaStateService) {
return new DanglingIndicesState(env, metaStateService, null, mock(ClusterService.class));
return createDanglingIndicesState(env, metaStateService, null, true);
}

private DanglingIndicesState createDanglingIndicesState(
NodeEnvironment env,
MetaStateService metaStateService,
LocalAllocateDangledIndices indexAllocator,
boolean shouldAutoImport
) {
final Settings allocateSettings = Settings.builder().put(AUTO_IMPORT_DANGLING_INDICES_SETTING.getKey(), shouldAutoImport).build();

final ClusterService clusterServiceMock = mock(ClusterService.class);
when(clusterServiceMock.getClusterSettings()).thenReturn(
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
when(clusterServiceMock.getSettings()).thenReturn(allocateSettings);

return new DanglingIndicesState(env, metaStateService, indexAllocator, clusterServiceMock);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices.recovery;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;

import static org.elasticsearch.cluster.metadata.IndexGraveyard.SETTING_MAX_TOMBSTONES;
import static org.elasticsearch.gateway.DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

@ClusterScope(numDataNodes = 0, scope = ESIntegTestCase.Scope.TEST)
public class DanglingIndicesIT extends ESIntegTestCase {
private static final String INDEX_NAME = "test-idx-1";

private static final int MIN_DOC_COUNT = 500;
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why have these constants here? If you're always passing the same shard and replica count to createAndPopulateIndex, there is no point in having these. I find there to be a lot of ceremony in this class that is not adding anything.


private Settings buildSettings(boolean importDanglingIndices) {
return Settings.builder()
// Don't keep any indices in the graveyard, so that when we delete an index,
// it's definitely considered to be dangling.
.put(SETTING_MAX_TOMBSTONES.getKey(), 0)
.put(AUTO_IMPORT_DANGLING_INDICES_SETTING.getKey(), importDanglingIndices)
.build();
}

/**
* Check that when dangling indices are discovered, then they are recovered into
* the cluster, so long as the recovery setting is enabled.
*/
public void testDanglingIndicesAreRecoveredWhenSettingIsEnabled() throws Exception {
logger.info("--> starting cluster");
final Settings settings = buildSettings(true);
internalCluster().startNodes(3, settings);

// Create an index and distribute it across the 3 nodes
createAndPopulateIndex(INDEX_NAME, SHARD_COUNT, REPLICA_COUNT);
ensureGreen();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createAndPopulateIndex already has a call to ensureGreen.


// This is so that when then node comes back up, we have a dangling index that can be recovered.
logger.info("--> restarted a random node and deleting the index while it's down");
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {

@Override
public Settings onNodeStopped(String nodeName) throws Exception {
deleteIndex(INDEX_NAME);
return super.onNodeStopped(nodeName);
}
});

ensureGreen();

assertTrue("Expected dangling index " + INDEX_NAME + " to be recovered", indexExists(INDEX_NAME));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no guarantee that the dangling index has been recovered at this point, as that has an asynchronous notification / import process. This line needs to be put into an assertBusy

}

/**
* Check that when dangling indices are discovered, then they are not recovered into
* the cluster when the recovery setting is disabled.
*/
public void testDanglingIndicesAreNotRecoveredWhenSettingIsDisabled() throws Exception {
logger.info("--> starting cluster");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this logging message add? It's the first line in the test, which is already logged.

internalCluster().startNodes(3, buildSettings(false));

// Create an index and distribute it across the 3 nodes
createAndPopulateIndex(INDEX_NAME, SHARD_COUNT, REPLICA_COUNT);

// Create another index so that once we drop the first index, we
// can still assert that the cluster is green.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this.ensureGreen does not require for an index to be present?

createAndPopulateIndex(INDEX_NAME + "-other", SHARD_COUNT, REPLICA_COUNT);

ensureGreen();

// This is so that when then node comes back up, we have a dangling index that could
// be recovered, but shouldn't be.
logger.info("--> restarted a random node and deleting the index while it's down");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The restart is already logged at info level. Maybe just change the comment to say // Restart node, deleting the index in its absence, so that there is a dangling index to recover

internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {

@Override
public Settings onNodeStopped(String nodeName) throws Exception {
deleteIndex(INDEX_NAME);
return super.onNodeStopped(nodeName);
}
});

ensureGreen();

assertFalse("Did not expect dangling index " + INDEX_NAME + " to be recovered", indexExists(INDEX_NAME));
}

private void createAndPopulateIndex(String name, int shardCount, int replicaCount) throws InterruptedException {
logger.info("--> creating test index: {}", name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logging here is not necessary. Elasticsearch already logs index creations at info level.

assertAcked(
prepareCreate(
name,
Settings.builder()
.put("number_of_shards", shardCount)
.put("number_of_replicas", replicaCount)
.put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahem...that's copy-and-paste. Can you tell it's the first time I've written an ES IT? 😅

)
);
ensureGreen();

logger.info("--> indexing sample data");
final int numDocs = between(MIN_DOC_COUNT, 1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to add any docs. Let's keep it simple

final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];

for (int i = 0; i < numDocs; i++) {
docs[i] = client().prepareIndex(name)
.setSource("foo-int", randomInt(), "foo-string", randomAlphaOfLength(32), "foo-float", randomFloat());
}

indexRandom(true, docs);
flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why flush?

assertThat(client().prepareSearch(name).setSize(0).get().getHits().getTotalHits().value, equalTo((long) numDocs));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why check that indexing works ok? Not the point of this test, that is tested elsewhere.


client().admin().indices().prepareStats(name).execute().actionGet();
}

private void deleteIndex(String indexName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why define an extra method here? just so that one logging line can be reused? Feels a bit over the top.

logger.info("--> deleting test index: {}", indexName);

assertAcked(client().admin().indices().prepareDelete(indexName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@
public class IndexRecoveryIT extends ESIntegTestCase {

private static final String INDEX_NAME = "test-idx-1";
private static final String INDEX_TYPE = "test-type-1";
private static final String REPO_NAME = "test-repo-1";
private static final String SNAP_NAME = "test-snap-1";

Expand Down
Loading