From 0513e0af390a4963d60feea60e0f9e0b8e461f42 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 23 Sep 2021 12:55:59 +0200 Subject: [PATCH 01/10] Allow listing older repositories --- qa/repository-old-versions/build.gradle | 125 ++++++ .../oldrepos/OldRepositoryAccessIT.java | 89 ++++ .../repositories/RepositoryData.java | 2 +- .../blobstore/BlobStoreRepository.java | 400 +++++++++++++++++- .../src/main/java/oldes/OldElasticsearch.java | 16 +- 5 files changed, 615 insertions(+), 17 deletions(-) create mode 100644 qa/repository-old-versions/build.gradle create mode 100644 qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java diff --git a/qa/repository-old-versions/build.gradle b/qa/repository-old-versions/build.gradle new file mode 100644 index 0000000000000..58786eb09fed9 --- /dev/null +++ b/qa/repository-old-versions/build.gradle @@ -0,0 +1,125 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + + +import org.apache.tools.ant.taskdefs.condition.Os +import org.elasticsearch.gradle.Architecture +import org.elasticsearch.gradle.OS +import org.elasticsearch.gradle.Version +import org.elasticsearch.gradle.internal.info.BuildParams +import org.elasticsearch.gradle.internal.test.AntFixture +import org.elasticsearch.gradle.testclusters.StandaloneRestIntegTestTask + +apply plugin: 'elasticsearch.jdk-download' +apply plugin: 'elasticsearch.internal-testclusters' +apply plugin: 'elasticsearch.standalone-rest-test' + +configurations { + oldesFixture +} + +dependencies { + oldesFixture project(':test:fixtures:old-elasticsearch') +} + +jdks { + legacy { + vendor = 'adoptium' + version = '8u302+b08' + platform = OS.current().name().toLowerCase() + architecture = Architecture.current().name().toLowerCase() + } +} + +if (Os.isFamily(Os.FAMILY_WINDOWS)) { + logger.warn("Disabling repository-old-versions tests because we can't get the pid file on windows") +} else if (rootProject.rootDir.toString().contains(" ")) { + logger.warn("Disabling repository-old-versions tests because Elasticsearch 1.7 won't start with spaces in the path") +} else { + /* Set up tasks to unzip and run the old versions of ES before running the + * integration tests. */ + for (String versionString : ['6.8.18', '5.6.16', '2.4.5', '1.7.6']) { + Version version = Version.fromString(versionString) + String packageName = version.major == 1 ? 'org.elasticsearch' : 'org.elasticsearch.distribution.zip' + String artifact = "${packageName}:elasticsearch:${version}@zip" + String versionNoDots = version.toString().replace('.', '_') + String configName = "es${versionNoDots}" + + configurations.create(configName) + + dependencies.add(configName, artifact) + + // TODO Rene: we should be able to replace these unzip tasks with gradle artifact transforms + TaskProvider unzip = tasks.register("unzipEs${versionNoDots}", Sync) { + Configuration oldEsDependency = configurations[configName] + dependsOn oldEsDependency + /* Use a closure here to delay resolution of the dependency until we need + * it */ + from { + oldEsDependency.collect { zipTree(it) } + } + into temporaryDir + } + + String repoLocation = "${buildDir}/cluster/shared/repo/${versionNoDots}" + + String clusterName = versionNoDots + + testClusters { + "${clusterName}" { + setting 'path.repo', repoLocation + setting 'xpack.security.enabled', 'false' + } + } + + TaskProvider fixture = tasks.register("oldES${versionNoDots}Fixture", AntFixture) { + dependsOn project.configurations.oldesFixture, jdks.legacy + dependsOn unzip + executable = "${BuildParams.runtimeJavaHome}/bin/java" + env 'CLASSPATH', "${-> project.configurations.oldesFixture.asPath}" + // old versions of Elasticsearch need JAVA_HOME + env 'JAVA_HOME', jdks.legacy.javaHomePath + // If we are running on certain arm systems we need to explicitly set the stack size to overcome JDK page size bug + if (Architecture.current() == Architecture.AARCH64) { + env 'ES_JAVA_OPTS', '-Xss512k' + } + args 'oldes.OldElasticsearch', + baseDir, + unzip.get().temporaryDir, + false, + "path.repo: ${repoLocation}" + waitCondition = { fixture, ant -> + // the fixture writes the ports file when Elasticsearch's HTTP service + // is ready, so we can just wait for the file to exist + return fixture.portsFile.exists() + } + } + + tasks.register("javaRestTest#${versionNoDots}", StandaloneRestIntegTestTask) { + useCluster testClusters."${clusterName}" + dependsOn fixture + doFirst { + delete(repoLocation) + mkdir(repoLocation) + } + systemProperty "tests.repo.location", repoLocation + systemProperty "tests.es.version", version.toString() + /* Use a closure on the string to delay evaluation until right before we + * run the integration tests so that we can be sure that the file is + * ready. */ + nonInputProperties.systemProperty "tests.es.port", "${-> fixture.get().addressAndPort}" + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${clusterName}".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${clusterName}".getName()}") + } + + tasks.named("check").configure { + dependsOn "javaRestTest#${versionNoDots}" + } + } +} + diff --git a/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java b/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java new file mode 100644 index 0000000000000..2901d9ea87c52 --- /dev/null +++ b/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.oldrepos; + +import org.apache.http.HttpHost; +import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.test.rest.ESRestTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class OldRepositoryAccessIT extends ESRestTestCase { + @Override + protected Map>> wipeSnapshots() { + return Collections.emptyMap(); + } + + public void testOldRepoAccess() throws IOException { + String repoLocation = System.getProperty("tests.repo.location"); + Version oldVersion = Version.fromString(System.getProperty("tests.es.version")); + + int oldEsPort = Integer.parseInt(System.getProperty("tests.es.port")); + try (RestClient oldEs = RestClient.builder(new HttpHost("127.0.0.1", oldEsPort)).build()) { + try { + Request createIndex = new Request("PUT", "/test"); + createIndex.setJsonEntity("{\"settings\":{\"number_of_shards\": 1}}"); + oldEs.performRequest(createIndex); + + for (int i = 0; i < 5; i++) { + Request doc = new Request("PUT", "/test/doc/testdoc" + i); + doc.addParameter("refresh", "true"); + doc.setJsonEntity("{\"test\":\"test" + i + "\", \"val\":" + i + "}"); + oldEs.performRequest(doc); + } + + // register repo on old ES and take snapshot + Request createRepoRequest = new Request("PUT", "/_snapshot/testrepo"); + createRepoRequest.setJsonEntity("{\"type\":\"fs\",\"settings\":{\"location\":\"" + repoLocation + "\"}}"); + oldEs.performRequest(createRepoRequest); + + Request createSnapshotRequest = new Request("PUT", "/_snapshot/testrepo/snap1"); + createSnapshotRequest.addParameter("wait_for_completion", "true"); + oldEs.performRequest(createSnapshotRequest); + + // register repo on new ES + Request createReadRepoRequest = new Request("PUT", "/_snapshot/testrepo"); + createReadRepoRequest.setJsonEntity("{\"type\":\"fs\",\"settings\":{\"location\":\"" + repoLocation + + "\"}}"); + client().performRequest(createReadRepoRequest); + + // list snapshots on new ES + Request listSnapshotsRequest = new Request("GET", "/_snapshot/testrepo/_all"); + listSnapshotsRequest.addParameter("error_trace", "true"); + Response listSnapshotsResponse = client().performRequest(listSnapshotsRequest); + logger.info(Streams.readFully(listSnapshotsResponse.getEntity().getContent()).utf8ToString()); + assertEquals(200, listSnapshotsResponse.getStatusLine().getStatusCode()); + + // list specific snapshot on new ES + Request listSpecificSnapshotsRequest = new Request("GET", "/_snapshot/testrepo/snap1"); + listSpecificSnapshotsRequest.addParameter("error_trace", "true"); + Response listSpecificSnapshotsResponse = client().performRequest(listSnapshotsRequest); + logger.info(Streams.readFully(listSpecificSnapshotsResponse.getEntity().getContent()).utf8ToString()); + assertEquals(200, listSpecificSnapshotsResponse.getStatusLine().getStatusCode()); + + // list advanced snapshot info on new ES + Request listSnapshotStatusRequest = new Request("GET", "/_snapshot/testrepo/snap1/_status"); + listSnapshotStatusRequest.addParameter("error_trace", "true"); + Response listSnapshotStatusResponse = client().performRequest(listSnapshotStatusRequest); + logger.info(Streams.readFully(listSnapshotStatusResponse.getEntity().getContent()).utf8ToString()); + assertEquals(200, listSnapshotStatusResponse.getStatusLine().getStatusCode()); + } finally { + oldEs.performRequest(new Request("DELETE", "/test")); + } + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index 0a41ccace73be..142372ec935dd 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -42,7 +42,7 @@ * A class that represents the data in a repository, as captured in the * repository's index blob. */ -public final class RepositoryData { +public class RepositoryData { /** * The generation value indicating the repository has no index generational files. diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index a4ea5a316dc3e..858bb9099a0fb 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -22,6 +22,7 @@ import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -38,12 +39,14 @@ import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.Numbers; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -55,7 +58,9 @@ import org.elasticsearch.common.blobstore.fs.FsBlobContainer; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.lucene.Lucene; @@ -69,16 +74,21 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; @@ -111,10 +121,13 @@ import org.elasticsearch.repositories.ShardSnapshotResult; import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.snapshots.AbortedSnapshotException; +import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotException; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotMissingException; +import org.elasticsearch.snapshots.SnapshotShardFailure; +import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.threadpool.ThreadPool; @@ -129,7 +142,9 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -141,6 +156,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -1521,9 +1537,25 @@ private void getOneSnapshotInfo(BlockingQueue queue, GetSnapshotInfo Exception failure = null; SnapshotInfo snapshotInfo = null; try { - snapshotInfo = SNAPSHOT_FORMAT.read(metadata.name(), blobContainer(), snapshotId.getUUID(), namedXContentRegistry); - } catch (NoSuchFileException ex) { - failure = new SnapshotMissingException(metadata.name(), snapshotId, ex); + try { + snapshotInfo = SNAPSHOT_FORMAT.read(metadata.name(), blobContainer(), snapshotId.getUUID(), namedXContentRegistry); + } catch (NoSuchFileException ex) { + // try legacy format + if (true /* legacy */) { + try { + snapshotInfo = SNAPSHOT_LEGACY_FORMAT.read( + metadata.name(), + blobContainer(), + snapshotId.getName(), + namedXContentRegistry + ); + } catch (NoSuchFileException ex2) { + failure = new SnapshotMissingException(metadata.name(), snapshotId, ex); + } + } else { + failure = new SnapshotMissingException(metadata.name(), snapshotId, ex); + } + } } catch (IOException | NotXContentException ex) { failure = new SnapshotException(metadata.name(), snapshotId, "failed to get snapshot info" + snapshotId, ex); } catch (Exception e) { @@ -1558,12 +1590,33 @@ public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) { @Override public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException { try { - return INDEX_METADATA_FORMAT.read( - metadata.name(), - indexContainer(index), - repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), - namedXContentRegistry - ); + IndexMetadata indexMetadata; + // read index metadata in compatibility mode first to figure out what its version is + try { + indexMetadata = BASIC_INDEX_METADATA_FORMAT.read( + metadata.name(), + indexContainer(index), + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), + namedXContentRegistry + ); + } catch (NoSuchFileException ex2) { + indexMetadata = LEGACY_INDEX_METADATA_FORMAT.read( + metadata.name(), + indexContainer(index), + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), + namedXContentRegistry + ); + } + if (indexMetadata.getCreationVersion().onOrAfter(Version.CURRENT.minimumIndexCompatibilityVersion())) { + return INDEX_METADATA_FORMAT.read( + metadata.name(), + indexContainer(index), + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), + namedXContentRegistry + ); + } else { + return indexMetadata; + } } catch (NoSuchFileException e) { throw new SnapshotMissingException(metadata.name(), snapshotId, e); } @@ -2083,6 +2136,26 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS private RepositoryData getRepositoryData(long indexGen) { if (indexGen == RepositoryData.EMPTY_REPO_GEN) { + // fall back to loading legacy snapshots + final List snapshotIds = snapshots(); + if (snapshotIds.isEmpty() == false) { + return new RepositoryData( + RepositoryData.MISSING_UUID, + RepositoryData.UNKNOWN_REPO_GEN, + snapshotIds.stream().collect(Collectors.toMap(s -> s.getUUID(), s -> s)), + Collections.emptyMap(), + Collections.emptyMap(), + ShardGenerations.EMPTY, + IndexMetaDataGenerations.EMPTY, + RepositoryData.MISSING_UUID + ) { + + @Override + public IndexId resolveIndexId(final String indexName) { + return new IndexId(indexName, indexName); + } + }; + } return RepositoryData.EMPTY; } try { @@ -3303,9 +3376,21 @@ private static List unusedBlobs( */ public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) { try { - return INDEX_SHARD_SNAPSHOT_FORMAT.read(metadata.name(), shardContainer, snapshotId.getUUID(), namedXContentRegistry); - } catch (NoSuchFileException ex) { - throw new SnapshotMissingException(metadata.name(), snapshotId, ex); + try { + return INDEX_SHARD_SNAPSHOT_FORMAT.read(metadata.name(), shardContainer, snapshotId.getUUID(), namedXContentRegistry); + } catch (NoSuchFileException ex) { + // fallback to ES 1.x legacy format + try { + return LEGACY_INDEX_SHARD_SNAPSHOT_FORMAT.read( + metadata.name(), + shardContainer, + snapshotId.getUUID(), + namedXContentRegistry + ); + } catch (NoSuchFileException ex2) { + throw new SnapshotMissingException(metadata.name(), snapshotId, ex); + } + } } catch (IOException ex) { throw new SnapshotException( metadata.name(), @@ -3492,4 +3577,295 @@ private static final class ShardSnapshotMetaDeleteResult { this.blobsToDelete = blobsToDelete; } } + + // Legacy blob store access + private static final String SNAPSHOT_SUFFIX = ".dat"; + private static final String LEGACY_SNAPSHOT_PREFIX = "snapshot-"; + private static final String LEGACY_SNAPSHOT_NAME_FORMAT = LEGACY_SNAPSHOT_PREFIX + "%s"; + private static final String COMMON_SNAPSHOT_PREFIX = "snap"; + private static final String SNAPSHOTS_INDEX_FILE = "index"; + + public List snapshots() { + final String repositoryName = metadata.name(); + try { + List snapshots = new ArrayList<>(); + Map blobs; + try { + blobs = blobContainer().listBlobsByPrefix(COMMON_SNAPSHOT_PREFIX); + } catch (UnsupportedOperationException ex) { + // Fall back in case listBlobsByPrefix isn't supported by the blob store + return readSnapshotList(); + } + int prefixLength = SNAPSHOT_PREFIX.length(); + int suffixLength = SNAPSHOT_SUFFIX.length(); + int legacyPrefixLength = LEGACY_SNAPSHOT_PREFIX.length(); + for (BlobMetadata md : blobs.values()) { + String blobName = md.name(); + final String name; + if (blobName.startsWith(SNAPSHOT_PREFIX) && blobName.length() > legacyPrefixLength) { + name = blobName.substring(prefixLength, blobName.length() - suffixLength); + } else if (blobName.startsWith(LEGACY_SNAPSHOT_PREFIX) && blobName.length() > suffixLength + prefixLength) { + name = blobName.substring(legacyPrefixLength); + } else { + // not sure what it was - ignore + continue; + } + snapshots.add(new SnapshotId(name, name)); + } + return Collections.unmodifiableList(snapshots); + } catch (IOException ex) { + throw new RepositoryException(repositoryName, "failed to list snapshots in repository", ex); + } + } + + protected List readSnapshotList() throws IOException { + final String repositoryName = metadata.name(); + try (InputStream blob = blobContainer().readBlob(SNAPSHOTS_INDEX_FILE)) { + final byte[] data = Streams.readFully(blob).array(); + ArrayList snapshots = new ArrayList<>(); + try ( + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.IGNORE_DEPRECATIONS, + new BytesArray(data) + ) + ) { + if (parser.nextToken() == XContentParser.Token.START_OBJECT) { + if (parser.nextToken() == XContentParser.Token.FIELD_NAME) { + String currentFieldName = parser.currentName(); + if ("snapshots".equals(currentFieldName)) { + if (parser.nextToken() == XContentParser.Token.START_ARRAY) { + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + snapshots.add(new SnapshotId(repositoryName, parser.text())); + } + } + } + } + } + } + return Collections.unmodifiableList(snapshots); + } + } + + public static class LegacyBlobStoreFormat { + + private final String blobNameFormat; + + private final CheckedBiFunction reader; + + public LegacyBlobStoreFormat(String blobNameFormat, CheckedBiFunction reader) { + this.blobNameFormat = blobNameFormat; + this.reader = reader; + } + + public T read(String repoName, BlobContainer blobContainer, String blobName, NamedXContentRegistry namedXContentRegistry) + throws IOException { + try (InputStream inputStream = blobContainer.readBlob(blobName(blobName))) { + final byte[] data = Streams.readFully(inputStream).array(); + try ( + XContentParser parser = XContentHelper.createParser( + namedXContentRegistry, + LoggingDeprecationHandler.INSTANCE, + new BytesArray(data) + ) + ) { + return reader.apply(repoName, parser); + } + } + } + + protected String blobName(String name) { + return String.format(Locale.ROOT, blobNameFormat, name); + } + } + + private final LegacyBlobStoreFormat SNAPSHOT_LEGACY_FORMAT = new LegacyBlobStoreFormat<>( + LEGACY_SNAPSHOT_NAME_FORMAT, + BlobStoreRepository::fromLegacyXContent + ); + + private final ChecksumBlobStoreFormat BASIC_INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "index-metadata", + METADATA_NAME_FORMAT, + (repoName, parser) -> basicIndexMetadataFromXContent(parser) + ); + private final LegacyBlobStoreFormat LEGACY_INDEX_METADATA_FORMAT = new LegacyBlobStoreFormat<>( + LEGACY_SNAPSHOT_NAME_FORMAT, + (repoName, parser) -> basicIndexMetadataFromXContent(parser) + ); + + public static SnapshotInfo fromLegacyXContent(final String repoName, XContentParser parser) throws IOException { + String name = null; + Version version = Version.CURRENT; + SnapshotState state = SnapshotState.IN_PROGRESS; + String reason = null; + List indices = Collections.emptyList(); + long startTime = 0; + long endTime = 0; + int totalShard = 0; + int successfulShards = 0; + List shardFailures = Collections.emptyList(); + if (parser.currentToken() == null) { // fresh parser? move to the first token + parser.nextToken(); + } + if (parser.currentToken() == XContentParser.Token.START_OBJECT) { // on a start object move to next token + parser.nextToken(); + } + XContentParser.Token token; + if ((token = parser.nextToken()) == XContentParser.Token.START_OBJECT) { + String currentFieldName = parser.currentName(); + if ("snapshot".equals(currentFieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + token = parser.nextToken(); + if (token.isValue()) { + if ("name".equals(currentFieldName)) { + name = parser.text(); + } else if ("state".equals(currentFieldName)) { + state = SnapshotState.valueOf(parser.text()); + } else if ("reason".equals(currentFieldName)) { + reason = parser.text(); + } else if ("start_time".equals(currentFieldName)) { + startTime = parser.longValue(); + } else if ("end_time".equals(currentFieldName)) { + endTime = parser.longValue(); + } else if ("total_shards".equals(currentFieldName)) { + totalShard = parser.intValue(); + } else if ("successful_shards".equals(currentFieldName)) { + successfulShards = parser.intValue(); + } else if ("version_id".equals(currentFieldName)) { + version = Version.fromId(parser.intValue()); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if ("indices".equals(currentFieldName)) { + ArrayList indicesArray = new ArrayList<>(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + indicesArray.add(parser.text()); + } + indices = Collections.unmodifiableList(indicesArray); + } else if ("failures".equals(currentFieldName)) { + ArrayList shardFailureArrayList = new ArrayList<>(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + shardFailureArrayList.add(SnapshotShardFailure.fromXContent(parser)); + } + shardFailures = Collections.unmodifiableList(shardFailureArrayList); + } else { + // It was probably created by newer version - ignoring + parser.skipChildren(); + } + } else if (token == XContentParser.Token.START_OBJECT) { + // It was probably created by newer version - ignoring + parser.skipChildren(); + } + } + } + } + } else { + throw new ElasticsearchParseException("unexpected token [" + token + "]"); + } + return new SnapshotInfo( + new Snapshot(repoName, new SnapshotId(name, name)), + indices, + Collections.emptyList(), + Collections.emptyList(), + reason, + version, + startTime, + endTime, + totalShard, + successfulShards, + shardFailures, + false, + Collections.emptyMap(), + state, + Collections.emptyMap() + ); + } + + public static IndexMetadata basicIndexMetadataFromXContent(XContentParser parser) throws IOException { + if (parser.currentToken() == null) { // fresh parser? move to the first token + parser.nextToken(); + } + if (parser.currentToken() == XContentParser.Token.START_OBJECT) { // on a start object move to next token + parser.nextToken(); + } + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser); + IndexMetadata.Builder builder = new IndexMetadata.Builder(parser.currentName()); + + String currentFieldName = null; + XContentParser.Token token = parser.nextToken(); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if ("settings".equals(currentFieldName)) { + Settings settings = Settings.fromXContent(parser); + builder.settings(settings); + } else if ("mappings".equals(currentFieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + String mappingType = currentFieldName; + Map mappingSource = MapBuilder.newMapBuilder() + .put(mappingType, parser.mapOrdered()) + .map(); + builder.putMapping(new MappingMetadata(mappingType, mappingSource)); + } else { + throw new IllegalArgumentException("Unexpected token: " + token); + } + } + } else { + // assume it's custom index metadata + parser.skipChildren(); + // parser.mapStrings(); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if ("mappings".equals(currentFieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) { + assert false : "only called for newer ES versions"; + builder.putMapping(new MappingMetadata(new CompressedXContent(parser.binaryValue()))); + } else { + Map mapping = parser.mapOrdered(); + // TODO: add support for _id, _type + Map newMapping = new LinkedHashMap<>(); + newMapping.put("_meta", Map.of("original_mapping", mapping)); + // newMapping.put("runtime", createRuntimeFields(mapping)); + newMapping.put("properties", Map.of("_uid", Map.of("type", "keyword"))); + builder.putMapping(new MappingMetadata(MapperService.SINGLE_MAPPING_NAME, newMapping)); + } + } + } else { + parser.skipChildren(); + } + } else if (token.isValue()) { + if ("state".equals(currentFieldName)) { + builder.state(IndexMetadata.State.fromString(parser.text())); + } else if ("version".equals(currentFieldName)) { + builder.version(parser.longValue()); + } else if ("mapping_version".equals(currentFieldName)) { + builder.mappingVersion(parser.longValue()); + } else if ("settings_version".equals(currentFieldName)) { + builder.settingsVersion(parser.longValue()); + } else if ("routing_num_shards".equals(currentFieldName)) { + builder.setRoutingNumShards(parser.intValue()); + } else { + // nothing + } + } else { + throw new IllegalArgumentException("Unexpected token " + token); + } + } + XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser); + + return builder.build(); + } + + private final LegacyBlobStoreFormat LEGACY_INDEX_SHARD_SNAPSHOT_FORMAT = new LegacyBlobStoreFormat<>( + LEGACY_SNAPSHOT_NAME_FORMAT, + (repoName, parser) -> BlobStoreIndexShardSnapshot.fromXContent(parser) + ); } diff --git a/test/fixtures/old-elasticsearch/src/main/java/oldes/OldElasticsearch.java b/test/fixtures/old-elasticsearch/src/main/java/oldes/OldElasticsearch.java index 66d8effd42689..a673249a93df9 100644 --- a/test/fixtures/old-elasticsearch/src/main/java/oldes/OldElasticsearch.java +++ b/test/fixtures/old-elasticsearch/src/main/java/oldes/OldElasticsearch.java @@ -71,7 +71,15 @@ public static void main(String[] args) throws IOException { Path bin = esDir.resolve("bin").resolve("elasticsearch" + (Constants.WINDOWS ? ".bat" : "")); Path config = esDir.resolve("config").resolve("elasticsearch.yml"); - Files.write(config, Arrays.asList("http.port: 0", "transport.tcp.port: 0", "network.host: 127.0.0.1"), StandardCharsets.UTF_8); + List configOptions = new ArrayList<>(); + configOptions.addAll(Arrays.asList("http.port: 0", "transport.tcp.port: 0", "network.host: 127.0.0.1")); + if (args.length > 3) { + for (int i = 3; i < args.length; i++) { + configOptions.add(args[i]); + } + } + + Files.write(config, configOptions, StandardCharsets.UTF_8); List command = new ArrayList<>(); command.add(bin.toString()); @@ -79,7 +87,7 @@ public static void main(String[] args) throws IOException { command.add("-f"); } command.add("-p"); - command.add("../pid"); + command.add(baseDir.resolve("pid").toString()); ProcessBuilder subprocess = new ProcessBuilder(command); Process process = subprocess.start(); System.out.println("Running " + command); @@ -88,7 +96,7 @@ public static void main(String[] args) throws IOException { int port = 0; Pattern pidPattern = Pattern.compile("pid\\[(\\d+)\\]"); - Pattern httpPortPattern = Pattern.compile("\\[http\\s+\\].+bound_address.+127\\.0\\.0\\.1:(\\d+)"); + Pattern httpPortPattern = Pattern.compile("(\\[http\\s+\\]|Netty4HttpServerTransport).+bound_address.+127\\.0\\.0\\.1:(\\d+)"); try (BufferedReader stdout = new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) { String line; while ((line = stdout.readLine()) != null && (pid == 0 || port == 0)) { @@ -101,7 +109,7 @@ public static void main(String[] args) throws IOException { } m = httpPortPattern.matcher(line); if (m.find()) { - port = Integer.parseInt(m.group(1)); + port = Integer.parseInt(m.group(2)); System.out.println("Found port: " + port); continue; } From 07e04cd38826f196ecdb01b3d24aa315a206bf2b Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 23 Sep 2021 14:33:15 +0200 Subject: [PATCH 02/10] fix URL blob access --- .../repositories/blobstore/BlobStoreRepository.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 858bb9099a0fb..290a38abf88b3 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -3644,6 +3644,8 @@ protected List readSnapshotList() throws IOException { } } return Collections.unmodifiableList(snapshots); + } catch (NoSuchFileException e) { + return Collections.emptyList(); } } From 3eaeea137e93bdd72b9c5139532e969378e1f474 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 27 Sep 2021 09:03:43 +0200 Subject: [PATCH 03/10] Support for more versions --- qa/repository-old-versions/build.gradle | 8 +- .../oldrepos/OldRepositoryAccessIT.java | 90 ++++++++++++++----- .../BlobStoreIndexShardSnapshot.java | 12 ++- .../src/main/java/oldes/OldElasticsearch.java | 3 +- 4 files changed, 82 insertions(+), 31 deletions(-) diff --git a/qa/repository-old-versions/build.gradle b/qa/repository-old-versions/build.gradle index 58786eb09fed9..13d64d7dc4748 100644 --- a/qa/repository-old-versions/build.gradle +++ b/qa/repository-old-versions/build.gradle @@ -25,6 +25,7 @@ configurations { dependencies { oldesFixture project(':test:fixtures:old-elasticsearch') + testImplementation project(':client:rest-high-level') } jdks { @@ -41,9 +42,10 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) { } else if (rootProject.rootDir.toString().contains(" ")) { logger.warn("Disabling repository-old-versions tests because Elasticsearch 1.7 won't start with spaces in the path") } else { - /* Set up tasks to unzip and run the old versions of ES before running the - * integration tests. */ - for (String versionString : ['6.8.18', '5.6.16', '2.4.5', '1.7.6']) { + /* Set up tasks to unzip and run the old versions of ES before running the integration tests. + * To avoid testing against too many old versions, always pick first and last version per major + */ + for (String versionString : ['1.0.0', '1.7.6', '2.0.0', '2.4.5', '5.0.0', '5.6.16', '6.0.0', '6.8.18']) { Version version = Version.fromString(versionString) String packageName = version.major == 1 ? 'org.elasticsearch' : 'org.elasticsearch.distribution.zip' String artifact = "${packageName}:elasticsearch:${version}@zip" diff --git a/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java b/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java index 2901d9ea87c52..b827d35b8c597 100644 --- a/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java +++ b/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java @@ -10,17 +10,33 @@ import org.apache.http.HttpHost; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; +import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.client.Node; import org.elasticsearch.client.Request; -import org.elasticsearch.client.Response; +import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; -import org.elasticsearch.common.io.Streams; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import org.elasticsearch.test.rest.ESRestTestCase; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + public class OldRepositoryAccessIT extends ESRestTestCase { @Override protected Map>> wipeSnapshots() { @@ -32,10 +48,12 @@ public void testOldRepoAccess() throws IOException { Version oldVersion = Version.fromString(System.getProperty("tests.es.version")); int oldEsPort = Integer.parseInt(System.getProperty("tests.es.port")); - try (RestClient oldEs = RestClient.builder(new HttpHost("127.0.0.1", oldEsPort)).build()) { + try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0]))); + RestClient oldEs = RestClient.builder(new HttpHost("127.0.0.1", oldEsPort)).build()) { try { Request createIndex = new Request("PUT", "/test"); - createIndex.setJsonEntity("{\"settings\":{\"number_of_shards\": 1}}"); + int numberOfShards = randomIntBetween(1, 3); + createIndex.setJsonEntity("{\"settings\":{\"number_of_shards\": " + numberOfShards + "}}"); oldEs.performRequest(createIndex); for (int i = 0; i < 5; i++) { @@ -52,34 +70,60 @@ public void testOldRepoAccess() throws IOException { Request createSnapshotRequest = new Request("PUT", "/_snapshot/testrepo/snap1"); createSnapshotRequest.addParameter("wait_for_completion", "true"); + createSnapshotRequest.setJsonEntity("{\"indices\":\"test\"}"); oldEs.performRequest(createSnapshotRequest); // register repo on new ES - Request createReadRepoRequest = new Request("PUT", "/_snapshot/testrepo"); - createReadRepoRequest.setJsonEntity("{\"type\":\"fs\",\"settings\":{\"location\":\"" + repoLocation + - "\"}}"); - client().performRequest(createReadRepoRequest); + ElasticsearchAssertions.assertAcked(client.snapshot().createRepository( + new PutRepositoryRequest("testrepo").type("fs").settings( + Settings.builder().put("location", repoLocation).build()), RequestOptions.DEFAULT)); // list snapshots on new ES - Request listSnapshotsRequest = new Request("GET", "/_snapshot/testrepo/_all"); - listSnapshotsRequest.addParameter("error_trace", "true"); - Response listSnapshotsResponse = client().performRequest(listSnapshotsRequest); - logger.info(Streams.readFully(listSnapshotsResponse.getEntity().getContent()).utf8ToString()); - assertEquals(200, listSnapshotsResponse.getStatusLine().getStatusCode()); + List snapshotInfos = + client.snapshot().get(new GetSnapshotsRequest("testrepo").snapshots(new String[] {"_all"}), + RequestOptions.DEFAULT).getSnapshots(); + assertThat(snapshotInfos, hasSize(1)); + SnapshotInfo snapshotInfo = snapshotInfos.get(0); + assertEquals("snap1", snapshotInfo.snapshotId().getName()); + assertEquals("testrepo", snapshotInfo.repository()); + assertEquals(Arrays.asList("test"), snapshotInfo.indices()); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertEquals(numberOfShards, snapshotInfo.successfulShards()); + assertEquals(numberOfShards, snapshotInfo.totalShards()); + assertEquals(0, snapshotInfo.failedShards()); + assertEquals(oldVersion, snapshotInfo.version()); // list specific snapshot on new ES - Request listSpecificSnapshotsRequest = new Request("GET", "/_snapshot/testrepo/snap1"); - listSpecificSnapshotsRequest.addParameter("error_trace", "true"); - Response listSpecificSnapshotsResponse = client().performRequest(listSnapshotsRequest); - logger.info(Streams.readFully(listSpecificSnapshotsResponse.getEntity().getContent()).utf8ToString()); - assertEquals(200, listSpecificSnapshotsResponse.getStatusLine().getStatusCode()); + snapshotInfos = + client.snapshot().get(new GetSnapshotsRequest("testrepo").snapshots(new String[] {"snap1"}), + RequestOptions.DEFAULT).getSnapshots(); + assertThat(snapshotInfos, hasSize(1)); + snapshotInfo = snapshotInfos.get(0); + assertEquals("snap1", snapshotInfo.snapshotId().getName()); + assertEquals("testrepo", snapshotInfo.repository()); + assertEquals(Arrays.asList("test"), snapshotInfo.indices()); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertEquals(numberOfShards, snapshotInfo.successfulShards()); + assertEquals(numberOfShards, snapshotInfo.totalShards()); + assertEquals(0, snapshotInfo.failedShards()); + assertEquals(oldVersion, snapshotInfo.version()); + // list advanced snapshot info on new ES - Request listSnapshotStatusRequest = new Request("GET", "/_snapshot/testrepo/snap1/_status"); - listSnapshotStatusRequest.addParameter("error_trace", "true"); - Response listSnapshotStatusResponse = client().performRequest(listSnapshotStatusRequest); - logger.info(Streams.readFully(listSnapshotStatusResponse.getEntity().getContent()).utf8ToString()); - assertEquals(200, listSnapshotStatusResponse.getStatusLine().getStatusCode()); + SnapshotsStatusResponse snapshotsStatusResponse = client.snapshot().status( + new SnapshotsStatusRequest("testrepo").snapshots(new String[]{"snap1"}), + RequestOptions.DEFAULT); + assertThat(snapshotsStatusResponse.getSnapshots(), hasSize(1)); + SnapshotStatus snapshotStatus = snapshotsStatusResponse.getSnapshots().get(0); + assertEquals("snap1", snapshotStatus.getSnapshot().getSnapshotId().getName()); + assertEquals("testrepo", snapshotStatus.getSnapshot().getRepository()); + assertEquals(Sets.newHashSet("test"), snapshotStatus.getIndices().keySet()); + assertEquals(SnapshotsInProgress.State.SUCCESS, snapshotStatus.getState()); + assertEquals(numberOfShards, snapshotStatus.getShardsStats().getDoneShards()); + assertEquals(numberOfShards, snapshotStatus.getShardsStats().getTotalShards()); + assertEquals(0, snapshotStatus.getShardsStats().getFailedShards()); + assertThat(snapshotStatus.getStats().getTotalSize(), greaterThan(0L)); + assertThat(snapshotStatus.getStats().getTotalFileCount(), greaterThan(0)); } finally { oldEs.performRequest(new Request("DELETE", "/test")); } diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java index d139e1cfff020..7398d6482b4c2 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java @@ -321,10 +321,14 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException { throw new ElasticsearchParseException("missing or invalid physical file name [" + physicalName + "]"); } else if (length < 0) { throw new ElasticsearchParseException("missing or invalid file length"); - } else if (writtenBy == null) { - throw new ElasticsearchParseException("missing or invalid written_by [" + writtenBy + "]"); - } else if (checksum == null) { - throw new ElasticsearchParseException("missing checksum for name [" + name + "]"); + } + if (writtenBy == null) { + writtenBy = "4.0.0"; // < Lucene 4.8 && >= Lucene 4.0, assume lowest version + // throw new ElasticsearchParseException("missing or invalid written_by [" + writtenBy + "]"); + } + if (checksum == null) { + checksum = "dummy"; + //throw new ElasticsearchParseException("missing checksum for name [" + name + "]"); } return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash), partSize); } diff --git a/test/fixtures/old-elasticsearch/src/main/java/oldes/OldElasticsearch.java b/test/fixtures/old-elasticsearch/src/main/java/oldes/OldElasticsearch.java index a673249a93df9..f50344aac41f6 100644 --- a/test/fixtures/old-elasticsearch/src/main/java/oldes/OldElasticsearch.java +++ b/test/fixtures/old-elasticsearch/src/main/java/oldes/OldElasticsearch.java @@ -96,7 +96,8 @@ public static void main(String[] args) throws IOException { int port = 0; Pattern pidPattern = Pattern.compile("pid\\[(\\d+)\\]"); - Pattern httpPortPattern = Pattern.compile("(\\[http\\s+\\]|Netty4HttpServerTransport).+bound_address.+127\\.0\\.0\\.1:(\\d+)"); + Pattern httpPortPattern = Pattern.compile( + "(\\[http\\s+\\]|Netty4HttpServerTransport|HttpServer).+bound_address.+127\\.0\\.0\\.1:(\\d+)"); try (BufferedReader stdout = new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) { String line; while ((line = stdout.readLine()) != null && (pid == 0 || port == 0)) { From 7fbbfb3bf59caabf767663be05119fccb1d092b4 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 20 Oct 2021 16:34:05 +0200 Subject: [PATCH 04/10] only support 5.0+ --- qa/repository-old-versions/build.gradle | 2 +- .../cluster/metadata/IndexMetadata.java | 81 +++- .../BlobStoreIndexShardSnapshot.java | 12 +- .../repositories/RepositoryData.java | 2 +- .../blobstore/BlobStoreRepository.java | 402 +----------------- .../blobstore/ChecksumBlobStoreFormat.java | 57 ++- 6 files changed, 141 insertions(+), 415 deletions(-) diff --git a/qa/repository-old-versions/build.gradle b/qa/repository-old-versions/build.gradle index 13d64d7dc4748..74c58dc42d170 100644 --- a/qa/repository-old-versions/build.gradle +++ b/qa/repository-old-versions/build.gradle @@ -45,7 +45,7 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) { /* Set up tasks to unzip and run the old versions of ES before running the integration tests. * To avoid testing against too many old versions, always pick first and last version per major */ - for (String versionString : ['1.0.0', '1.7.6', '2.0.0', '2.4.5', '5.0.0', '5.6.16', '6.0.0', '6.8.18']) { + for (String versionString : ['5.0.0', '5.6.16', '6.0.0', '6.8.18']) { Version version = Version.fromString(versionString) String packageName = version.major == 1 ? 'org.elasticsearch' : 'org.elasticsearch.distribution.zip' String artifact = "${packageName}:elasticsearch:${version}@zip" diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index e3d6a00dfa953..4feb44227020a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -37,12 +37,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xcontent.ToXContent; -import org.elasticsearch.xcontent.ToXContentFragment; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.gateway.MetadataStateFormat; @@ -52,6 +47,11 @@ import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentFragment; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; import java.time.Instant; @@ -677,7 +677,7 @@ public IndexLongFieldRange getTimestampRange() { return timestampRange; } - + @Override public boolean equals(Object o) { @@ -1638,6 +1638,75 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti } return builder.build(); } + + /** + * Used to load legacy metadata from ES versions that are no longer index-compatible. + * Returns information on best-effort basis. + * Throws an exception if the metadata is index-compatible with the current version (in that case, + * {@link #fromXContent} should be used to load the content. + */ + public static IndexMetadata legacyFromXContent(XContentParser parser) throws IOException { + if (parser.currentToken() == null) { // fresh parser? move to the first token + parser.nextToken(); + } + if (parser.currentToken() == XContentParser.Token.START_OBJECT) { // on a start object move to next token + parser.nextToken(); + } + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser); + Builder builder = new Builder(parser.currentName()); + + String currentFieldName = null; + XContentParser.Token token = parser.nextToken(); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if ("settings".equals(currentFieldName)) { + Settings settings = Settings.fromXContent(parser); + if (SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(Version.CURRENT.minimumIndexCompatibilityVersion())) { + throw new IllegalStateException("this method should only be used to parse older index metadata versions " + + "but got " + SETTING_INDEX_VERSION_CREATED.get(settings)); + } + builder.settings(settings); + } else if ("mappings".equals(currentFieldName)) { + // don't try to parse these for now + parser.skipChildren(); + } else { + // assume it's custom index metadata + parser.skipChildren(); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if ("mappings".equals(currentFieldName)) { + // don't try to parse these for now + parser.skipChildren(); + } else { + parser.skipChildren(); + } + } else if (token.isValue()) { + if ("state".equals(currentFieldName)) { + builder.state(State.fromString(parser.text())); + } else if ("version".equals(currentFieldName)) { + builder.version(parser.longValue()); + } else if ("mapping_version".equals(currentFieldName)) { + builder.mappingVersion(parser.longValue()); + } else if ("settings_version".equals(currentFieldName)) { + builder.settingsVersion(parser.longValue()); + } else if ("routing_num_shards".equals(currentFieldName)) { + builder.setRoutingNumShards(parser.intValue()); + } else { + // unknown, ignore + } + } else { + throw new IllegalArgumentException("Unexpected token " + token); + } + } + XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser); + + IndexMetadata indexMetadata = builder.build(); + assert indexMetadata.getCreationVersion().before(Version.CURRENT.minimumIndexCompatibilityVersion()); + return indexMetadata; + } } /** diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java index 3dc0b3f0a6f64..1f79458cdfd7e 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java @@ -338,14 +338,10 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException { throw new ElasticsearchParseException("missing or invalid physical file name [" + physicalName + "]"); } else if (length < 0) { throw new ElasticsearchParseException("missing or invalid file length"); - } - if (writtenBy == null) { - writtenBy = "4.0.0"; // < Lucene 4.8 && >= Lucene 4.0, assume lowest version - // throw new ElasticsearchParseException("missing or invalid written_by [" + writtenBy + "]"); - } - if (checksum == null) { - checksum = "dummy"; - //throw new ElasticsearchParseException("missing checksum for name [" + name + "]"); + } else if (writtenBy == null) { + throw new ElasticsearchParseException("missing or invalid written_by [" + writtenBy + "]"); + } else if (checksum == null) { + throw new ElasticsearchParseException("missing checksum for name [" + name + "]"); } return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash, writerUuid), partSize); } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index f96686e011de4..985b4408a00d3 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -42,7 +42,7 @@ * A class that represents the data in a repository, as captured in the * repository's index blob. */ -public class RepositoryData { +public final class RepositoryData { /** * The generation value indicating the repository has no index generational files. diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 41bda829f6e48..a9d63596f108d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -22,7 +22,6 @@ import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -39,14 +38,12 @@ import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.Numbers; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -58,9 +55,7 @@ import org.elasticsearch.common.blobstore.fs.FsBlobContainer; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.lucene.Lucene; @@ -75,13 +70,10 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; @@ -114,19 +106,14 @@ import org.elasticsearch.repositories.ShardSnapshotResult; import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.snapshots.AbortedSnapshotException; -import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotException; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotMissingException; -import org.elasticsearch.snapshots.SnapshotShardFailure; -import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xcontent.DeprecationHandler; import org.elasticsearch.xcontent.NamedXContentRegistry; -import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentParser; @@ -142,9 +129,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -290,6 +275,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "index-metadata", METADATA_NAME_FORMAT, + (repoName, parser) -> IndexMetadata.Builder.legacyFromXContent(parser), (repoName, parser) -> IndexMetadata.fromXContent(parser) ); @@ -1538,25 +1524,9 @@ private void getOneSnapshotInfo(BlockingQueue queue, GetSnapshotInfo Exception failure = null; SnapshotInfo snapshotInfo = null; try { - try { - snapshotInfo = SNAPSHOT_FORMAT.read(metadata.name(), blobContainer(), snapshotId.getUUID(), namedXContentRegistry); - } catch (NoSuchFileException ex) { - // try legacy format - if (true /* legacy */) { - try { - snapshotInfo = SNAPSHOT_LEGACY_FORMAT.read( - metadata.name(), - blobContainer(), - snapshotId.getName(), - namedXContentRegistry - ); - } catch (NoSuchFileException ex2) { - failure = new SnapshotMissingException(metadata.name(), snapshotId, ex); - } - } else { - failure = new SnapshotMissingException(metadata.name(), snapshotId, ex); - } - } + snapshotInfo = SNAPSHOT_FORMAT.read(metadata.name(), blobContainer(), snapshotId.getUUID(), namedXContentRegistry); + } catch (NoSuchFileException ex) { + failure = new SnapshotMissingException(metadata.name(), snapshotId, ex); } catch (IOException | NotXContentException ex) { failure = new SnapshotException(metadata.name(), snapshotId, "failed to get snapshot info" + snapshotId, ex); } catch (Exception e) { @@ -1591,33 +1561,12 @@ public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) { @Override public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException { try { - IndexMetadata indexMetadata; - // read index metadata in compatibility mode first to figure out what its version is - try { - indexMetadata = BASIC_INDEX_METADATA_FORMAT.read( - metadata.name(), - indexContainer(index), - repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), - namedXContentRegistry - ); - } catch (NoSuchFileException ex2) { - indexMetadata = LEGACY_INDEX_METADATA_FORMAT.read( - metadata.name(), - indexContainer(index), - repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), - namedXContentRegistry - ); - } - if (indexMetadata.getCreationVersion().onOrAfter(Version.CURRENT.minimumIndexCompatibilityVersion())) { - return INDEX_METADATA_FORMAT.read( - metadata.name(), - indexContainer(index), - repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), - namedXContentRegistry - ); - } else { - return indexMetadata; - } + return INDEX_METADATA_FORMAT.read( + metadata.name(), + indexContainer(index), + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), + namedXContentRegistry + ); } catch (NoSuchFileException e) { throw new SnapshotMissingException(metadata.name(), snapshotId, e); } @@ -2137,26 +2086,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS private RepositoryData getRepositoryData(long indexGen) { if (indexGen == RepositoryData.EMPTY_REPO_GEN) { - // fall back to loading legacy snapshots - final List snapshotIds = snapshots(); - if (snapshotIds.isEmpty() == false) { - return new RepositoryData( - RepositoryData.MISSING_UUID, - RepositoryData.UNKNOWN_REPO_GEN, - snapshotIds.stream().collect(Collectors.toMap(s -> s.getUUID(), s -> s)), - Collections.emptyMap(), - Collections.emptyMap(), - ShardGenerations.EMPTY, - IndexMetaDataGenerations.EMPTY, - RepositoryData.MISSING_UUID - ) { - - @Override - public IndexId resolveIndexId(final String indexName) { - return new IndexId(indexName, indexName); - } - }; - } return RepositoryData.EMPTY; } try { @@ -3413,21 +3342,9 @@ private static List unusedBlobs( */ public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) { try { - try { - return INDEX_SHARD_SNAPSHOT_FORMAT.read(metadata.name(), shardContainer, snapshotId.getUUID(), namedXContentRegistry); - } catch (NoSuchFileException ex) { - // fallback to ES 1.x legacy format - try { - return LEGACY_INDEX_SHARD_SNAPSHOT_FORMAT.read( - metadata.name(), - shardContainer, - snapshotId.getUUID(), - namedXContentRegistry - ); - } catch (NoSuchFileException ex2) { - throw new SnapshotMissingException(metadata.name(), snapshotId, ex); - } - } + return INDEX_SHARD_SNAPSHOT_FORMAT.read(metadata.name(), shardContainer, snapshotId.getUUID(), namedXContentRegistry); + } catch (NoSuchFileException ex) { + throw new SnapshotMissingException(metadata.name(), snapshotId, ex); } catch (IOException ex) { throw new SnapshotException( metadata.name(), @@ -3614,297 +3531,4 @@ private static final class ShardSnapshotMetaDeleteResult { this.blobsToDelete = blobsToDelete; } } - - // Legacy blob store access - private static final String SNAPSHOT_SUFFIX = ".dat"; - private static final String LEGACY_SNAPSHOT_PREFIX = "snapshot-"; - private static final String LEGACY_SNAPSHOT_NAME_FORMAT = LEGACY_SNAPSHOT_PREFIX + "%s"; - private static final String COMMON_SNAPSHOT_PREFIX = "snap"; - private static final String SNAPSHOTS_INDEX_FILE = "index"; - - public List snapshots() { - final String repositoryName = metadata.name(); - try { - List snapshots = new ArrayList<>(); - Map blobs; - try { - blobs = blobContainer().listBlobsByPrefix(COMMON_SNAPSHOT_PREFIX); - } catch (UnsupportedOperationException ex) { - // Fall back in case listBlobsByPrefix isn't supported by the blob store - return readSnapshotList(); - } - int prefixLength = SNAPSHOT_PREFIX.length(); - int suffixLength = SNAPSHOT_SUFFIX.length(); - int legacyPrefixLength = LEGACY_SNAPSHOT_PREFIX.length(); - for (BlobMetadata md : blobs.values()) { - String blobName = md.name(); - final String name; - if (blobName.startsWith(SNAPSHOT_PREFIX) && blobName.length() > legacyPrefixLength) { - name = blobName.substring(prefixLength, blobName.length() - suffixLength); - } else if (blobName.startsWith(LEGACY_SNAPSHOT_PREFIX) && blobName.length() > suffixLength + prefixLength) { - name = blobName.substring(legacyPrefixLength); - } else { - // not sure what it was - ignore - continue; - } - snapshots.add(new SnapshotId(name, name)); - } - return Collections.unmodifiableList(snapshots); - } catch (IOException ex) { - throw new RepositoryException(repositoryName, "failed to list snapshots in repository", ex); - } - } - - protected List readSnapshotList() throws IOException { - final String repositoryName = metadata.name(); - try (InputStream blob = blobContainer().readBlob(SNAPSHOTS_INDEX_FILE)) { - final byte[] data = Streams.readFully(blob).array(); - ArrayList snapshots = new ArrayList<>(); - try ( - XContentParser parser = XContentHelper.createParser( - NamedXContentRegistry.EMPTY, - DeprecationHandler.IGNORE_DEPRECATIONS, - new BytesArray(data) - ) - ) { - if (parser.nextToken() == XContentParser.Token.START_OBJECT) { - if (parser.nextToken() == XContentParser.Token.FIELD_NAME) { - String currentFieldName = parser.currentName(); - if ("snapshots".equals(currentFieldName)) { - if (parser.nextToken() == XContentParser.Token.START_ARRAY) { - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - snapshots.add(new SnapshotId(repositoryName, parser.text())); - } - } - } - } - } - } - return Collections.unmodifiableList(snapshots); - } catch (NoSuchFileException e) { - return Collections.emptyList(); - } - } - - public static class LegacyBlobStoreFormat { - - private final String blobNameFormat; - - private final CheckedBiFunction reader; - - public LegacyBlobStoreFormat(String blobNameFormat, CheckedBiFunction reader) { - this.blobNameFormat = blobNameFormat; - this.reader = reader; - } - - public T read(String repoName, BlobContainer blobContainer, String blobName, NamedXContentRegistry namedXContentRegistry) - throws IOException { - try (InputStream inputStream = blobContainer.readBlob(blobName(blobName))) { - final byte[] data = Streams.readFully(inputStream).array(); - try ( - XContentParser parser = XContentHelper.createParser( - namedXContentRegistry, - LoggingDeprecationHandler.INSTANCE, - new BytesArray(data) - ) - ) { - return reader.apply(repoName, parser); - } - } - } - - protected String blobName(String name) { - return String.format(Locale.ROOT, blobNameFormat, name); - } - } - - private final LegacyBlobStoreFormat SNAPSHOT_LEGACY_FORMAT = new LegacyBlobStoreFormat<>( - LEGACY_SNAPSHOT_NAME_FORMAT, - BlobStoreRepository::fromLegacyXContent - ); - - private final ChecksumBlobStoreFormat BASIC_INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( - "index-metadata", - METADATA_NAME_FORMAT, - (repoName, parser) -> basicIndexMetadataFromXContent(parser) - ); - private final LegacyBlobStoreFormat LEGACY_INDEX_METADATA_FORMAT = new LegacyBlobStoreFormat<>( - LEGACY_SNAPSHOT_NAME_FORMAT, - (repoName, parser) -> basicIndexMetadataFromXContent(parser) - ); - - public static SnapshotInfo fromLegacyXContent(final String repoName, XContentParser parser) throws IOException { - String name = null; - Version version = Version.CURRENT; - SnapshotState state = SnapshotState.IN_PROGRESS; - String reason = null; - List indices = Collections.emptyList(); - long startTime = 0; - long endTime = 0; - int totalShard = 0; - int successfulShards = 0; - List shardFailures = Collections.emptyList(); - if (parser.currentToken() == null) { // fresh parser? move to the first token - parser.nextToken(); - } - if (parser.currentToken() == XContentParser.Token.START_OBJECT) { // on a start object move to next token - parser.nextToken(); - } - XContentParser.Token token; - if ((token = parser.nextToken()) == XContentParser.Token.START_OBJECT) { - String currentFieldName = parser.currentName(); - if ("snapshot".equals(currentFieldName)) { - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - token = parser.nextToken(); - if (token.isValue()) { - if ("name".equals(currentFieldName)) { - name = parser.text(); - } else if ("state".equals(currentFieldName)) { - state = SnapshotState.valueOf(parser.text()); - } else if ("reason".equals(currentFieldName)) { - reason = parser.text(); - } else if ("start_time".equals(currentFieldName)) { - startTime = parser.longValue(); - } else if ("end_time".equals(currentFieldName)) { - endTime = parser.longValue(); - } else if ("total_shards".equals(currentFieldName)) { - totalShard = parser.intValue(); - } else if ("successful_shards".equals(currentFieldName)) { - successfulShards = parser.intValue(); - } else if ("version_id".equals(currentFieldName)) { - version = Version.fromId(parser.intValue()); - } - } else if (token == XContentParser.Token.START_ARRAY) { - if ("indices".equals(currentFieldName)) { - ArrayList indicesArray = new ArrayList<>(); - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - indicesArray.add(parser.text()); - } - indices = Collections.unmodifiableList(indicesArray); - } else if ("failures".equals(currentFieldName)) { - ArrayList shardFailureArrayList = new ArrayList<>(); - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - shardFailureArrayList.add(SnapshotShardFailure.fromXContent(parser)); - } - shardFailures = Collections.unmodifiableList(shardFailureArrayList); - } else { - // It was probably created by newer version - ignoring - parser.skipChildren(); - } - } else if (token == XContentParser.Token.START_OBJECT) { - // It was probably created by newer version - ignoring - parser.skipChildren(); - } - } - } - } - } else { - throw new ElasticsearchParseException("unexpected token [" + token + "]"); - } - return new SnapshotInfo( - new Snapshot(repoName, new SnapshotId(name, name)), - indices, - Collections.emptyList(), - Collections.emptyList(), - reason, - version, - startTime, - endTime, - totalShard, - successfulShards, - shardFailures, - false, - Collections.emptyMap(), - state, - Collections.emptyMap() - ); - } - - public static IndexMetadata basicIndexMetadataFromXContent(XContentParser parser) throws IOException { - if (parser.currentToken() == null) { // fresh parser? move to the first token - parser.nextToken(); - } - if (parser.currentToken() == XContentParser.Token.START_OBJECT) { // on a start object move to next token - parser.nextToken(); - } - XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser); - IndexMetadata.Builder builder = new IndexMetadata.Builder(parser.currentName()); - - String currentFieldName = null; - XContentParser.Token token = parser.nextToken(); - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser); - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT) { - if ("settings".equals(currentFieldName)) { - Settings settings = Settings.fromXContent(parser); - builder.settings(settings); - } else if ("mappings".equals(currentFieldName)) { - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT) { - String mappingType = currentFieldName; - Map mappingSource = MapBuilder.newMapBuilder() - .put(mappingType, parser.mapOrdered()) - .map(); - builder.putMapping(new MappingMetadata(mappingType, mappingSource)); - } else { - throw new IllegalArgumentException("Unexpected token: " + token); - } - } - } else { - // assume it's custom index metadata - parser.skipChildren(); - // parser.mapStrings(); - } - } else if (token == XContentParser.Token.START_ARRAY) { - if ("mappings".equals(currentFieldName)) { - while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { - if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) { - assert false : "only called for newer ES versions"; - builder.putMapping(new MappingMetadata(new CompressedXContent(parser.binaryValue()))); - } else { - Map mapping = parser.mapOrdered(); - // TODO: add support for _id, _type - Map newMapping = new LinkedHashMap<>(); - newMapping.put("_meta", Map.of("original_mapping", mapping)); - // newMapping.put("runtime", createRuntimeFields(mapping)); - newMapping.put("properties", Map.of("_uid", Map.of("type", "keyword"))); - builder.putMapping(new MappingMetadata(MapperService.SINGLE_MAPPING_NAME, newMapping)); - } - } - } else { - parser.skipChildren(); - } - } else if (token.isValue()) { - if ("state".equals(currentFieldName)) { - builder.state(IndexMetadata.State.fromString(parser.text())); - } else if ("version".equals(currentFieldName)) { - builder.version(parser.longValue()); - } else if ("mapping_version".equals(currentFieldName)) { - builder.mappingVersion(parser.longValue()); - } else if ("settings_version".equals(currentFieldName)) { - builder.settingsVersion(parser.longValue()); - } else if ("routing_num_shards".equals(currentFieldName)) { - builder.setRoutingNumShards(parser.intValue()); - } else { - // nothing - } - } else { - throw new IllegalArgumentException("Unexpected token " + token); - } - } - XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser); - - return builder.build(); - } - - private final LegacyBlobStoreFormat LEGACY_INDEX_SHARD_SNAPSHOT_FORMAT = new LegacyBlobStoreFormat<>( - LEGACY_SNAPSHOT_NAME_FORMAT, - (repoName, parser) -> BlobStoreIndexShardSnapshot.fromXContent(parser) - ); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index a2918f6bda00c..0d02ec3edfcb1 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -19,11 +19,13 @@ import org.elasticsearch.common.Numbers; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.XContentParserUtils; +import org.elasticsearch.core.Nullable; import org.elasticsearch.gateway.CorruptStateException; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ToXContent; @@ -32,6 +34,7 @@ import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentType; +import java.io.ByteArrayInputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; @@ -64,15 +67,29 @@ public final class ChecksumBlobStoreFormat { private final CheckedBiFunction reader; + private final CheckedBiFunction fallbackReader; + /** - * @param codec codec name + * @param codec code name * @param blobNameFormat format of the blobname in {@link String#format} format * @param reader prototype object that can deserialize T from XContent + * @param fallbackReader fallback prototype object that can deserialize T from XContent in case reader fails */ - public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedBiFunction reader) { + public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedBiFunction reader, + @Nullable CheckedBiFunction fallbackReader) { this.reader = reader; this.blobNameFormat = blobNameFormat; this.codec = codec; + this.fallbackReader = fallbackReader; + } + + /** + * @param codec codec name + * @param blobNameFormat format of the blobname in {@link String#format} format + * @param reader prototype object that can deserialize T from XContent + */ + public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedBiFunction reader) { + this(codec, blobNameFormat, reader, null); } /** @@ -104,15 +121,35 @@ public T deserialize(String repoName, NamedXContentRegistry namedXContentRegistr } else { wrappedStream = deserializeMetaBlobInputStream; } - final T result; - try ( - XContentParser parser = XContentType.SMILE.xContent() - .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream) - ) { - result = reader.apply(repoName, parser); - XContentParserUtils.ensureExpectedToken(null, parser.nextToken(), parser); + T result; + + if (fallbackReader != null) { + // fully read stream to allow rereading it when calling fallback + BytesReference bytesReference = Streams.readFully(wrappedStream); + deserializeMetaBlobInputStream.verifyFooter(); + try ( + XContentParser parser = XContentType.SMILE.xContent() + .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, bytesReference.streamInput()) + ) { + result = reader.apply(repoName, parser); + XContentParserUtils.ensureExpectedToken(null, parser.nextToken(), parser); + } catch (Exception e) { + try (XContentParser parser = XContentType.SMILE.xContent() + .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, bytesReference.streamInput())) { + result = fallbackReader.apply(repoName, parser); + XContentParserUtils.ensureExpectedToken(null, parser.nextToken(), parser); + } + } + } else { + try ( + XContentParser parser = XContentType.SMILE.xContent() + .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream) + ) { + result = reader.apply(repoName, parser); + XContentParserUtils.ensureExpectedToken(null, parser.nextToken(), parser); + } + deserializeMetaBlobInputStream.verifyFooter(); } - deserializeMetaBlobInputStream.verifyFooter(); return result; } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { // we trick this into a dedicated exception with the original stacktrace From 07b2500774fd5cf7bc91d5ba9971fa29eedd4c37 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 20 Oct 2021 17:09:44 +0200 Subject: [PATCH 05/10] fix --- qa/repository-old-versions/build.gradle | 2 +- .../blobstore/ChecksumBlobStoreFormat.java | 15 ++++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/qa/repository-old-versions/build.gradle b/qa/repository-old-versions/build.gradle index 74c58dc42d170..503b5a3224adb 100644 --- a/qa/repository-old-versions/build.gradle +++ b/qa/repository-old-versions/build.gradle @@ -47,7 +47,7 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) { */ for (String versionString : ['5.0.0', '5.6.16', '6.0.0', '6.8.18']) { Version version = Version.fromString(versionString) - String packageName = version.major == 1 ? 'org.elasticsearch' : 'org.elasticsearch.distribution.zip' + String packageName = 'org.elasticsearch.distribution.zip' String artifact = "${packageName}:elasticsearch:${version}@zip" String versionNoDots = version.toString().replace('.', '_') String configName = "es${versionNoDots}" diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 0d02ec3edfcb1..f58fd53faf364 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -34,7 +34,6 @@ import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentType; -import java.io.ByteArrayInputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; @@ -75,8 +74,12 @@ public final class ChecksumBlobStoreFormat { * @param reader prototype object that can deserialize T from XContent * @param fallbackReader fallback prototype object that can deserialize T from XContent in case reader fails */ - public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedBiFunction reader, - @Nullable CheckedBiFunction fallbackReader) { + public ChecksumBlobStoreFormat( + String codec, + String blobNameFormat, + CheckedBiFunction reader, + @Nullable CheckedBiFunction fallbackReader + ) { this.reader = reader; this.blobNameFormat = blobNameFormat; this.codec = codec; @@ -134,8 +137,10 @@ public T deserialize(String repoName, NamedXContentRegistry namedXContentRegistr result = reader.apply(repoName, parser); XContentParserUtils.ensureExpectedToken(null, parser.nextToken(), parser); } catch (Exception e) { - try (XContentParser parser = XContentType.SMILE.xContent() - .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, bytesReference.streamInput())) { + try ( + XContentParser parser = XContentType.SMILE.xContent() + .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, bytesReference.streamInput()) + ) { result = fallbackReader.apply(repoName, parser); XContentParserUtils.ensureExpectedToken(null, parser.nextToken(), parser); } From ef5edc775538336a183fccd03bc43da92f3bd6a3 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 20 Oct 2021 17:13:30 +0200 Subject: [PATCH 06/10] fix javadoc --- .../repositories/blobstore/ChecksumBlobStoreFormat.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index f58fd53faf364..2cc42f0993a9e 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -69,7 +69,7 @@ public final class ChecksumBlobStoreFormat { private final CheckedBiFunction fallbackReader; /** - * @param codec code name + * @param codec codec name * @param blobNameFormat format of the blobname in {@link String#format} format * @param reader prototype object that can deserialize T from XContent * @param fallbackReader fallback prototype object that can deserialize T from XContent in case reader fails From 3c1c8a9bf68cbd73df41406916da0c7c72fa250c Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 21 Oct 2021 14:26:46 +0200 Subject: [PATCH 07/10] no 1.0 --- qa/repository-old-versions/build.gradle | 2 -- 1 file changed, 2 deletions(-) diff --git a/qa/repository-old-versions/build.gradle b/qa/repository-old-versions/build.gradle index 503b5a3224adb..f2cb79037f2f8 100644 --- a/qa/repository-old-versions/build.gradle +++ b/qa/repository-old-versions/build.gradle @@ -39,8 +39,6 @@ jdks { if (Os.isFamily(Os.FAMILY_WINDOWS)) { logger.warn("Disabling repository-old-versions tests because we can't get the pid file on windows") -} else if (rootProject.rootDir.toString().contains(" ")) { - logger.warn("Disabling repository-old-versions tests because Elasticsearch 1.7 won't start with spaces in the path") } else { /* Set up tasks to unzip and run the old versions of ES before running the integration tests. * To avoid testing against too many old versions, always pick first and last version per major From d31dd0e57fefa4afb0c7d8bebd4290f1e1991841 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 21 Oct 2021 16:14:58 +0200 Subject: [PATCH 08/10] wait longer --- qa/repository-old-versions/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/qa/repository-old-versions/build.gradle b/qa/repository-old-versions/build.gradle index f2cb79037f2f8..2453755d432c9 100644 --- a/qa/repository-old-versions/build.gradle +++ b/qa/repository-old-versions/build.gradle @@ -93,6 +93,7 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) { unzip.get().temporaryDir, false, "path.repo: ${repoLocation}" + maxWaitInSeconds 60 waitCondition = { fixture, ant -> // the fixture writes the ports file when Elasticsearch's HTTP service // is ready, so we can just wait for the file to exist From a1239212be9afcfab9d883a4de6b2da3068d5d38 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 26 Oct 2021 12:22:06 +0200 Subject: [PATCH 09/10] build-related review feedback --- qa/repository-old-versions/build.gradle | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/qa/repository-old-versions/build.gradle b/qa/repository-old-versions/build.gradle index 2453755d432c9..bfdb7082586ef 100644 --- a/qa/repository-old-versions/build.gradle +++ b/qa/repository-old-versions/build.gradle @@ -43,7 +43,7 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) { /* Set up tasks to unzip and run the old versions of ES before running the integration tests. * To avoid testing against too many old versions, always pick first and last version per major */ - for (String versionString : ['5.0.0', '5.6.16', '6.0.0', '6.8.18']) { + for (String versionString : ['5.0.0', '5.6.16', '6.0.0', '6.8.20']) { Version version = Version.fromString(versionString) String packageName = 'org.elasticsearch.distribution.zip' String artifact = "${packageName}:elasticsearch:${version}@zip" @@ -70,11 +70,9 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) { String clusterName = versionNoDots - testClusters { - "${clusterName}" { - setting 'path.repo', repoLocation - setting 'xpack.security.enabled', 'false' - } + def testClusterProvider = testClusters.register(clusterName) { + setting 'path.repo', repoLocation + setting 'xpack.security.enabled', 'false' } TaskProvider fixture = tasks.register("oldES${versionNoDots}Fixture", AntFixture) { @@ -102,7 +100,7 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) { } tasks.register("javaRestTest#${versionNoDots}", StandaloneRestIntegTestTask) { - useCluster testClusters."${clusterName}" + useCluster testClusterProvider dependsOn fixture doFirst { delete(repoLocation) @@ -114,8 +112,8 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) { * run the integration tests so that we can be sure that the file is * ready. */ nonInputProperties.systemProperty "tests.es.port", "${-> fixture.get().addressAndPort}" - nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${clusterName}".allHttpSocketURI.join(",")}") - nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${clusterName}".getName()}") + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusterProvider.get().allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusterProvider.get().getName()}") } tasks.named("check").configure { From b0fb447b08b353851f2ca6cd1a140c29679c90b6 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 26 Oct 2021 13:32:28 +0200 Subject: [PATCH 10/10] review feedback --- .../java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java | 1 + .../java/org/elasticsearch/cluster/metadata/IndexMetadata.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java b/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java index 679260bb98488..140ef92f9507f 100644 --- a/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java +++ b/qa/repository-old-versions/src/test/java/org/elasticsearch/oldrepos/OldRepositoryAccessIT.java @@ -43,6 +43,7 @@ public class OldRepositoryAccessIT extends ESRestTestCase { return Collections.emptyMap(); } + @SuppressWarnings("removal") public void testOldRepoAccess() throws IOException { String repoLocation = System.getProperty("tests.repo.location"); Version oldVersion = Version.fromString(System.getProperty("tests.es.version")); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index afd76f60be70c..e0147354e98cb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -1698,7 +1698,7 @@ public static IndexMetadata legacyFromXContent(XContentParser parser) throws IOE // unknown, ignore } } else { - throw new IllegalArgumentException("Unexpected token " + token); + XContentParserUtils.throwUnknownToken(token, parser.getTokenLocation()); } } XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser);