diff --git a/server/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java b/server/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java index 59ad749f638ee..abd0e593a9f18 100644 --- a/server/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java +++ b/server/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java @@ -19,11 +19,13 @@ package org.elasticsearch.index.store; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import java.io.IOException; import java.text.ParseException; @@ -100,6 +102,29 @@ public String checksum() { return this.checksum; } + /** + * Checks if the bytes returned by {@link #hash()} are the contents of the file that this instances refers to. + * + * @return {@code true} iff {@link #hash()} will return the actual file contents + */ + public boolean hashEqualsContents() { + if (hash.length == length) { + try { + final boolean checksumsMatch = Store.digestToString(CodecUtil.retrieveChecksum( + new ByteArrayIndexInput("store_file", hash.bytes, hash.offset, hash.length))).equals(checksum); + assert checksumsMatch : "Checksums did not match for [" + this + "] which has a hash of [" + hash + "]"; + return checksumsMatch; + } catch (Exception e) { + // Hash didn't contain any bytes that Lucene could extract a checksum from so we can't verify against the checksum of the + // original file. We should never see an exception here because lucene files are assumed to always contain the checksum + // footer. + assert false : new AssertionError("Saw exception for hash [" + hash + "] but expected it to be Lucene file", e); + return false; + } + } + return false; + } + /** * Returns true iff the length and the checksums are the same. otherwise false */ diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index fe662ce5b09ce..053a832604fc6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -30,6 +30,7 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; @@ -179,7 +180,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final String SNAPSHOT_INDEX_CODEC = "snapshots"; - private static final String DATA_BLOB_PREFIX = "__"; + private static final String UPLOADED_DATA_BLOB_PREFIX = "__"; + + /** + * Prefix used for the identifiers of data blobs that were not actually written to the repository physically because their contents are + * already stored in the metadata referencing them, i.e. in {@link BlobStoreIndexShardSnapshot} and + * {@link BlobStoreIndexShardSnapshots}. This is the case for files for which {@link StoreFileMetaData#hashEqualsContents()} is + * {@code true}. + */ + private static final String VIRTUAL_DATA_BLOB_PREFIX = "v__"; /** * When set to true metadata files are stored in compressed format. This setting doesn’t affect index @@ -1516,6 +1525,9 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s } } + // We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents + // directly in the shard level metadata in this case + final boolean needsWrite = md.hashEqualsContents() == false; indexTotalFileCount += md.length(); indexTotalNumberOfFiles++; @@ -1524,9 +1536,14 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s indexIncrementalSize += md.length(); // create a new FileInfo BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = - new BlobStoreIndexShardSnapshot.FileInfo(DATA_BLOB_PREFIX + UUIDs.randomBase64UUID(), md, chunkSize()); + new BlobStoreIndexShardSnapshot.FileInfo( + (needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(), + md, chunkSize()); indexCommitPointFiles.add(snapshotFileInfo); - filesToSnapshot.add(snapshotFileInfo); + if (needsWrite) { + filesToSnapshot.add(snapshotFileInfo); + } + assert needsWrite || assertFileContentsMatchHash(snapshotFileInfo, store); } else { indexCommitPointFiles.add(existingFileInfo); } @@ -1535,8 +1552,6 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount, indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount); - assert indexIncrementalFileCount == filesToSnapshot.size(); - final StepListener> allFilesUploadedListener = new StepListener<>(); allFilesUploadedListener.whenComplete(v -> { final IndexShardSnapshotStatus.Copy lastSnapshotStatus = @@ -1625,6 +1640,17 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s } } + private static boolean assertFileContentsMatchHash(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) { + try (IndexInput indexInput = store.openVerifyingInput(fileInfo.physicalName(), IOContext.READONCE, fileInfo.metadata())) { + final byte[] tmp = new byte[Math.toIntExact(fileInfo.metadata().length())]; + indexInput.readBytes(tmp, 0, tmp.length); + assert fileInfo.metadata().hash().bytesEquals(new BytesRef(tmp)); + } catch (IOException e) { + throw new AssertionError(e); + } + return true; + } + @Override public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState, ActionListener listener) { @@ -1668,38 +1694,42 @@ protected void restoreFiles(List filesToRe private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException { boolean success = false; - - try (InputStream stream = maybeRateLimit(new SlicedInputStream(fileInfo.numberOfParts()) { - @Override - protected InputStream openSlice(long slice) throws IOException { - return container.readBlob(fileInfo.partName(slice)); - } - }, - restoreRateLimiter, restoreRateLimitingTimeInNanos)) { - try (IndexOutput indexOutput = - store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { - final byte[] buffer = new byte[BUFFER_SIZE]; - int length; - while ((length = stream.read(buffer)) > 0) { - indexOutput.writeBytes(buffer, 0, length); - recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length); - } - Store.verify(indexOutput); - indexOutput.close(); - store.directory().sync(Collections.singleton(fileInfo.physicalName())); - success = true; - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { - try { - store.markStoreCorrupted(ex); - } catch (IOException e) { - logger.warn("store cannot be marked as corrupted", e); - } - throw ex; - } finally { - if (success == false) { - store.deleteQuiet(fileInfo.physicalName()); + try (IndexOutput indexOutput = + store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { + if (fileInfo.name().startsWith(VIRTUAL_DATA_BLOB_PREFIX)) { + final BytesRef hash = fileInfo.metadata().hash(); + indexOutput.writeBytes(hash.bytes, hash.offset, hash.length); + recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), hash.length); + } else { + try (InputStream stream = maybeRateLimit(new SlicedInputStream(fileInfo.numberOfParts()) { + @Override + protected InputStream openSlice(long slice) throws IOException { + return container.readBlob(fileInfo.partName(slice)); + } + }, restoreRateLimiter, restoreRateLimitingTimeInNanos)) { + final byte[] buffer = new byte[BUFFER_SIZE]; + int length; + while ((length = stream.read(buffer)) > 0) { + indexOutput.writeBytes(buffer, 0, length); + recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length); + } } } + Store.verify(indexOutput); + indexOutput.close(); + store.directory().sync(Collections.singleton(fileInfo.physicalName())); + success = true; + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + try { + store.markStoreCorrupted(ex); + } catch (IOException e) { + logger.warn("store cannot be marked as corrupted", e); + } + throw ex; + } finally { + if (success == false) { + store.deleteQuiet(fileInfo.physicalName()); + } } } }.restore(snapshotFiles, store, l); @@ -1830,7 +1860,7 @@ private static List unusedBlobs(Set blobs, Set surviving || (blob.startsWith(SNAPSHOT_PREFIX) && blob.endsWith(".dat") && survivingSnapshotUUIDs.contains( blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length())) == false) - || (blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null) + || (blob.startsWith(UPLOADED_DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null) || FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList()); } @@ -1884,7 +1914,7 @@ private Tuple buildBlobStoreIndexShardSnapsh final BlobStoreIndexShardSnapshots shardSnapshots = indexShardSnapshotsFormat.read(shardContainer, Long.toString(latest)); return new Tuple<>(shardSnapshots, latest); } else if (blobs.stream().anyMatch(b -> b.startsWith(SNAPSHOT_PREFIX) || b.startsWith(INDEX_FILE_PREFIX) - || b.startsWith(DATA_BLOB_PREFIX))) { + || b.startsWith(UPLOADED_DATA_BLOB_PREFIX))) { throw new IllegalStateException( "Could not find a readable index-N file in a non-empty shard snapshot directory [" + shardContainer.path() + "]"); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index ab5aebe853b9e..0296a25e44332 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -1132,11 +1132,11 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException { SnapshotStats stats = snapshots.get(0).getStats(); - assertThat(stats.getTotalFileCount(), is(snapshot0FileCount)); - assertThat(stats.getTotalSize(), is(snapshot0FileSize)); + assertThat(stats.getTotalFileCount(), greaterThanOrEqualTo(snapshot0FileCount)); + assertThat(stats.getTotalSize(), greaterThanOrEqualTo(snapshot0FileSize)); - assertThat(stats.getIncrementalFileCount(), equalTo(snapshot0FileCount)); - assertThat(stats.getIncrementalSize(), equalTo(snapshot0FileSize)); + assertThat(stats.getIncrementalFileCount(), equalTo(stats.getTotalFileCount())); + assertThat(stats.getIncrementalSize(), equalTo(stats.getTotalSize())); assertThat(stats.getIncrementalFileCount(), equalTo(stats.getProcessedFileCount())); assertThat(stats.getIncrementalSize(), equalTo(stats.getProcessedSize())); @@ -1175,8 +1175,8 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException { ArrayList snapshotFilesDiff = new ArrayList<>(snapshot1Files); snapshotFilesDiff.removeAll(snapshot0Files); - assertThat(anotherStats.getIncrementalFileCount(), equalTo(snapshotFilesDiff.size())); - assertThat(anotherStats.getIncrementalSize(), equalTo(calculateTotalFilesSize(snapshotFilesDiff))); + assertThat(anotherStats.getIncrementalFileCount(), greaterThanOrEqualTo(snapshotFilesDiff.size())); + assertThat(anotherStats.getIncrementalSize(), greaterThanOrEqualTo(calculateTotalFilesSize(snapshotFilesDiff))); assertThat(anotherStats.getIncrementalFileCount(), equalTo(anotherStats.getProcessedFileCount())); assertThat(anotherStats.getIncrementalSize(), equalTo(anotherStats.getProcessedSize())); @@ -1184,8 +1184,8 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException { assertThat(stats.getTotalSize(), lessThan(anotherStats.getTotalSize())); assertThat(stats.getTotalFileCount(), lessThan(anotherStats.getTotalFileCount())); - assertThat(anotherStats.getTotalFileCount(), is(snapshot1FileCount)); - assertThat(anotherStats.getTotalSize(), is(snapshot1FileSize)); + assertThat(anotherStats.getTotalFileCount(), greaterThanOrEqualTo(snapshot1FileCount)); + assertThat(anotherStats.getTotalSize(), greaterThanOrEqualTo(snapshot1FileSize)); } public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index d74f99b9b9326..67a4ddd177087 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -1035,7 +1035,8 @@ public void testUnrestorableFilesDuringRestore() throws Exception { final String indexName = "unrestorable-files"; final int maxRetries = randomIntBetween(1, 10); - Settings createIndexSettings = Settings.builder().put(SETTING_ALLOCATION_MAX_RETRY.getKey(), maxRetries).build(); + Settings createIndexSettings = Settings.builder().put(SETTING_ALLOCATION_MAX_RETRY.getKey(), maxRetries) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).build(); Settings repositorySettings = Settings.builder() .put("random", randomAlphaOfLength(10))