Skip to content

Commit f8e5c22

Browse files
committed
Tests: Make sure snapshots created with old version of elasticsearch can be restored
Closes #8968
1 parent 7ebf39f commit f8e5c22

29 files changed

+212
-4
lines changed

dev-tools/create-bwc-index.py

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def start_node(version, release_dir, data_dir, tcp_port, http_port):
113113
'-Des.transport.tcp.port=%s' % tcp_port,
114114
'-Des.http.port=%s' % http_port
115115
]
116-
if version.startswith('0.') or version == '1.0.0.Beta1':
116+
if version.startswith('0.') or version.startswith('1.0.0.Beta') :
117117
cmd.append('-f') # version before 1.0 start in background automatically
118118
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
119119

@@ -150,17 +150,60 @@ def generate_index(client):
150150
logging.info('Running basic asserts on the data added')
151151
run_basic_asserts(client, 'test', 'doc', num_docs)
152152

153+
def snapshot_index(client, cfg):
154+
# Add bogus persistent settings to make sure they can be restored
155+
client.cluster.put_settings(body = {
156+
'persistent': {
157+
'cluster.routing.allocation.exclude.version_attr' : cfg.version
158+
}
159+
})
160+
client.indices.put_template(name = 'template_' + cfg.version.lower(), order = 0, body = {
161+
"template" : "te*",
162+
"settings" : {
163+
"number_of_shards" : 1
164+
},
165+
"mappings" : {
166+
"type1" : {
167+
"_source" : { "enabled" : False }
168+
}
169+
},
170+
"aliases" : {
171+
"alias1" : {},
172+
"alias2" : {
173+
"filter" : {
174+
"term" : {"version" : cfg.version }
175+
},
176+
"routing" : "kimchy"
177+
},
178+
"{index}-alias" : {}
179+
}
180+
});
181+
client.snapshot.create_repository(repository='test_repo', body={
182+
'type': 'fs',
183+
'settings': {
184+
'location': cfg.repo_dir
185+
}
186+
})
187+
client.snapshot.create(repository='test_repo', snapshot='test_1', wait_for_completion=True)
188+
153189
def compress_index(version, tmp_dir, output_dir):
190+
compress(tmp_dir, output_dir, 'index-%s.zip' % version, 'data')
191+
192+
def compress_repo(version, tmp_dir, output_dir):
193+
compress(tmp_dir, output_dir, 'repo-%s.zip' % version, 'repo')
194+
195+
def compress(tmp_dir, output_dir, zipfile, directory):
154196
abs_output_dir = os.path.abspath(output_dir)
155-
zipfile = os.path.join(abs_output_dir, 'index-%s.zip' % version)
197+
zipfile = os.path.join(abs_output_dir, zipfile)
156198
if os.path.exists(zipfile):
157199
os.remove(zipfile)
158200
logging.info('Compressing index into %s', zipfile)
159201
olddir = os.getcwd()
160202
os.chdir(tmp_dir)
161-
subprocess.check_call('zip -r %s *' % zipfile, shell=True)
203+
subprocess.check_call('zip -r %s %s' % (zipfile, directory), shell=True)
162204
os.chdir(olddir)
163205

206+
164207
def parse_config():
165208
parser = argparse.ArgumentParser(description='Builds an elasticsearch index for backwards compatibility tests')
166209
parser.add_argument('version', metavar='X.Y.Z',
@@ -184,7 +227,10 @@ def parse_config():
184227

185228
cfg.tmp_dir = tempfile.mkdtemp()
186229
cfg.data_dir = os.path.join(cfg.tmp_dir, 'data')
230+
cfg.repo_dir = os.path.join(cfg.tmp_dir, 'repo')
187231
logging.info('Temp data dir: %s' % cfg.data_dir)
232+
logging.info('Temp repo dir: %s' % cfg.repo_dir)
233+
cfg.snapshot_supported = not (cfg.version.startswith('0.') or cfg.version == '1.0.0.Beta1')
188234

189235
return cfg
190236

@@ -193,17 +239,21 @@ def main():
193239
datefmt='%Y-%m-%d %I:%M:%S %p')
194240
logging.getLogger('elasticsearch').setLevel(logging.ERROR)
195241
logging.getLogger('urllib3').setLevel(logging.WARN)
196-
197242
cfg = parse_config()
198243
try:
199244
node = start_node(cfg.version, cfg.release_dir, cfg.data_dir, cfg.tcp_port, cfg.http_port)
200245
client = create_client(cfg.http_port)
201246
generate_index(client)
247+
if cfg.snapshot_supported:
248+
snapshot_index(client, cfg)
202249
finally:
203250
if 'node' in vars():
204251
logging.info('Shutting down node with pid %d', node.pid)
205252
node.terminate()
253+
time.sleep(1) # some nodes take time to terminate
206254
compress_index(cfg.version, cfg.tmp_dir, cfg.output_dir)
255+
if cfg.snapshot_supported:
256+
compress_repo(cfg.version, cfg.tmp_dir, cfg.output_dir)
207257

208258
if __name__ == '__main__':
209259
try:
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.bwcompat;
20+
21+
import org.apache.lucene.util.LuceneTestCase.Slow;
22+
import org.elasticsearch.Version;
23+
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
24+
import org.elasticsearch.action.search.SearchResponse;
25+
import org.elasticsearch.cluster.ClusterState;
26+
import org.elasticsearch.cluster.metadata.IndexMetaData;
27+
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
28+
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
29+
import org.elasticsearch.common.settings.ImmutableSettings;
30+
import org.elasticsearch.rest.RestStatus;
31+
import org.elasticsearch.snapshots.AbstractSnapshotTests;
32+
import org.elasticsearch.snapshots.RestoreInfo;
33+
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
34+
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
35+
import org.junit.Test;
36+
37+
import java.io.IOException;
38+
import java.lang.reflect.Modifier;
39+
import java.net.URI;
40+
import java.nio.file.DirectoryStream;
41+
import java.nio.file.Files;
42+
import java.nio.file.Path;
43+
import java.nio.file.Paths;
44+
import java.util.List;
45+
import java.util.Locale;
46+
import java.util.SortedSet;
47+
import java.util.TreeSet;
48+
49+
import static com.google.common.collect.Lists.newArrayList;
50+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
51+
import static org.hamcrest.Matchers.*;
52+
53+
@Slow
54+
@ClusterScope(scope = Scope.TEST)
55+
public class RestoreBackwardsCompatTests extends AbstractSnapshotTests {
56+
57+
58+
@Test
59+
public void restoreOldSnapshots() throws Exception {
60+
String repo = "test_repo";
61+
String snapshot = "test_1";
62+
List<String> repoVersions = repoVersions();
63+
assertThat(repoVersions.size(), greaterThan(0));
64+
for (String version : repoVersions) {
65+
createRepo(version, repo);
66+
testOldSnapshot(version, repo, snapshot);
67+
}
68+
69+
SortedSet<String> expectedVersions = new TreeSet<>();
70+
for (java.lang.reflect.Field field : Version.class.getDeclaredFields()) {
71+
if (Modifier.isStatic(field.getModifiers()) && field.getType() == Version.class) {
72+
Version v = (Version) field.get(Version.class);
73+
if (v.snapshot()) continue;
74+
if (v.onOrBefore(Version.V_1_0_0_Beta1)) continue;
75+
76+
expectedVersions.add(v.toString());
77+
}
78+
}
79+
80+
for (String repoVersion : repoVersions) {
81+
if (expectedVersions.remove(repoVersion) == false) {
82+
logger.warn("Old repositories tests contain extra repo: " + repoVersion);
83+
}
84+
}
85+
if (expectedVersions.isEmpty() == false) {
86+
StringBuilder msg = new StringBuilder("Old repositories tests are missing versions:");
87+
for (String expected : expectedVersions) {
88+
msg.append("\n" + expected);
89+
}
90+
fail(msg.toString());
91+
}
92+
}
93+
94+
public static List<String> repoVersions() throws Exception {
95+
List<String> repoVersions = newArrayList();
96+
Path repoFiles = Paths.get(RestoreBackwardsCompatTests.class.getResource(".").toURI());
97+
try (DirectoryStream<Path> stream = Files.newDirectoryStream(repoFiles, "repo-*.zip")) {
98+
for (Path entry : stream) {
99+
String fileName = entry.getFileName().toString();
100+
String version = fileName.substring("repo-".length());
101+
version = version.substring(0, version.length() - ".zip".length());
102+
repoVersions.add(version);
103+
}
104+
}
105+
return repoVersions;
106+
}
107+
108+
private void createRepo(String version, String repo) throws Exception {
109+
String repoFile = "repo-" + version + ".zip";
110+
URI repoFileUri = getClass().getResource(repoFile).toURI();
111+
URI repoJarUri = new URI("jar:" + repoFileUri.toString() + "!/repo/");
112+
logger.info("--> creating repository [{}] for version [{}]", repo, version);
113+
assertAcked(client().admin().cluster().preparePutRepository(repo)
114+
.setType("url").setSettings(ImmutableSettings.settingsBuilder()
115+
.put("url", repoJarUri.toString())));
116+
}
117+
118+
private void testOldSnapshot(String version, String repo, String snapshot) throws IOException {
119+
logger.info("--> restoring snapshot");
120+
RestoreSnapshotResponse response = client().admin().cluster().prepareRestoreSnapshot(repo, snapshot).setRestoreGlobalState(true).setWaitForCompletion(true).get();
121+
assertThat(response.status(), equalTo(RestStatus.OK));
122+
RestoreInfo restoreInfo = response.getRestoreInfo();
123+
assertThat(restoreInfo.successfulShards(), greaterThan(0));
124+
assertThat(restoreInfo.successfulShards(), equalTo(restoreInfo.totalShards()));
125+
assertThat(restoreInfo.failedShards(), equalTo(0));
126+
String index = restoreInfo.indices().get(0);
127+
128+
logger.info("--> check search");
129+
SearchResponse searchResponse = client().prepareSearch(index).get();
130+
assertThat(searchResponse.getHits().totalHits(), greaterThan(1L));
131+
132+
logger.info("--> check settings");
133+
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
134+
assertThat(clusterState.metaData().persistentSettings().get(FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP + "version_attr"), equalTo(version));
135+
136+
logger.info("--> check templates");
137+
IndexTemplateMetaData template = clusterState.getMetaData().templates().get("template_" + version.toLowerCase(Locale.ROOT));
138+
assertThat(template, notNullValue());
139+
assertThat(template.template(), equalTo("te*"));
140+
assertThat(template.settings().getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, -1), equalTo(1));
141+
assertThat(template.mappings().size(), equalTo(1));
142+
assertThat(template.mappings().get("type1").string(), equalTo("{\"type1\":{\"_source\":{\"enabled\":false}}}"));
143+
if (Version.fromString(version).onOrAfter(Version.V_1_1_0)) {
144+
// Support for aliases in templates was added in v1.1.0
145+
assertThat(template.aliases().size(), equalTo(3));
146+
assertThat(template.aliases().get("alias1"), notNullValue());
147+
assertThat(template.aliases().get("alias2").filter().string(), containsString(version));
148+
assertThat(template.aliases().get("alias2").indexRouting(), equalTo("kimchy"));
149+
assertThat(template.aliases().get("{index}-alias"), notNullValue());
150+
}
151+
152+
logger.info("--> cleanup");
153+
cluster().wipeIndices(restoreInfo.indices().toArray(new String[restoreInfo.indices().size()]));
154+
cluster().wipeTemplates();
155+
156+
}
157+
}
158+
Binary file not shown.
11.4 KB
Binary file not shown.
24.9 KB
Binary file not shown.
21.5 KB
Binary file not shown.
19.7 KB
Binary file not shown.
21.4 KB
Binary file not shown.
16.1 KB
Binary file not shown.
10.3 KB
Binary file not shown.

0 commit comments

Comments
 (0)