Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.elasticsearch.index.store;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;

import java.io.IOException;
import java.text.ParseException;
Expand Down Expand Up @@ -100,6 +102,29 @@ public String checksum() {
return this.checksum;
}

/**
* Checks if the bytes returned by {@link #hash()} are the contents of the file that this instances refers to.
*
* @return {@code true} iff {@link #hash()} will return the actual file contents
*/
public boolean hashEqualsContents() {
if (hash.length == length) {
try {
final boolean checksumsMatch = Store.digestToString(CodecUtil.retrieveChecksum(
new ByteArrayIndexInput("store_file", hash.bytes, hash.offset, hash.length))).equals(checksum);
assert checksumsMatch : "Checksums did not match for [" + this + "] which has a hash of [" + hash + "]";
return checksumsMatch;
} catch (Exception e) {
// Hash didn't contain any bytes that Lucene could extract a checksum from so we can't verify against the checksum of the
// original file. We should never see an exception here because lucene files are assumed to always contain the checksum
// footer.
assert false : new AssertionError("Saw exception for hash [" + hash + "] but expected it to be Lucene file", e);
return false;
}
}
return false;
}

/**
* Returns <code>true</code> iff the length and the checksums are the same. otherwise <code>false</code>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -179,7 +180,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private static final String SNAPSHOT_INDEX_CODEC = "snapshots";

private static final String DATA_BLOB_PREFIX = "__";
private static final String UPLOADED_DATA_BLOB_PREFIX = "__";

/**
* Prefix used for the identifiers of data blobs that were not actually written to the repository physically because their contents are
* already stored in the metadata referencing them, i.e. in {@link BlobStoreIndexShardSnapshot} and
* {@link BlobStoreIndexShardSnapshots}. This is the case for files for which {@link StoreFileMetaData#hashEqualsContents()} is
* {@code true}.
*/
private static final String VIRTUAL_DATA_BLOB_PREFIX = "v__";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to have virtual blob here so that the logic/assertions in BlobStoreIndexShardSnapshot(s) works without format change and things stay BwC. Technically we could do without this kind of "ID", but then we'd have to go through the whole dance of only updating the format and not uploading the blob once all the snapshots are newer than version X. If we do it this way, we get the benefit of faster restores and snapshots right away since the meta hash works the same way in 6.x already hence even in 7.7 all possible restores should work out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some Javadocs here explaining what virtual data blobs are?


/**
* When set to true metadata files are stored in compressed format. This setting doesn’t affect index
Expand Down Expand Up @@ -1516,6 +1525,9 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
}
}

// We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
// directly in the shard level metadata in this case
final boolean needsWrite = md.hashEqualsContents() == false;
indexTotalFileCount += md.length();
indexTotalNumberOfFiles++;

Expand All @@ -1524,9 +1536,14 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
indexIncrementalSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo =
new BlobStoreIndexShardSnapshot.FileInfo(DATA_BLOB_PREFIX + UUIDs.randomBase64UUID(), md, chunkSize());
new BlobStoreIndexShardSnapshot.FileInfo(
(needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(),
md, chunkSize());
indexCommitPointFiles.add(snapshotFileInfo);
filesToSnapshot.add(snapshotFileInfo);
if (needsWrite) {
filesToSnapshot.add(snapshotFileInfo);
}
assert needsWrite || assertFileContentsMatchHash(snapshotFileInfo, store);
} else {
indexCommitPointFiles.add(existingFileInfo);
}
Expand All @@ -1535,8 +1552,6 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount,
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount);

assert indexIncrementalFileCount == filesToSnapshot.size();

final StepListener<Collection<Void>> allFilesUploadedListener = new StepListener<>();
allFilesUploadedListener.whenComplete(v -> {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus =
Expand Down Expand Up @@ -1625,6 +1640,17 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
}
}

private static boolean assertFileContentsMatchHash(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) {
try (IndexInput indexInput = store.openVerifyingInput(fileInfo.physicalName(), IOContext.READONCE, fileInfo.metadata())) {
final byte[] tmp = new byte[Math.toIntExact(fileInfo.metadata().length())];
indexInput.readBytes(tmp, 0, tmp.length);
assert fileInfo.metadata().hash().bytesEquals(new BytesRef(tmp));
} catch (IOException e) {
throw new AssertionError(e);
}
return true;
}

@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState, ActionListener<Void> listener) {
Expand Down Expand Up @@ -1668,38 +1694,42 @@ protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRe

private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException {
boolean success = false;

try (InputStream stream = maybeRateLimit(new SlicedInputStream(fileInfo.numberOfParts()) {
@Override
protected InputStream openSlice(long slice) throws IOException {
return container.readBlob(fileInfo.partName(slice));
}
},
restoreRateLimiter, restoreRateLimitingTimeInNanos)) {
try (IndexOutput indexOutput =
store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
}
Store.verify(indexOutput);
indexOutput.close();
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
success = true;
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
try {
store.markStoreCorrupted(ex);
} catch (IOException e) {
logger.warn("store cannot be marked as corrupted", e);
}
throw ex;
} finally {
if (success == false) {
store.deleteQuiet(fileInfo.physicalName());
try (IndexOutput indexOutput =
store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
if (fileInfo.name().startsWith(VIRTUAL_DATA_BLOB_PREFIX)) {
final BytesRef hash = fileInfo.metadata().hash();
indexOutput.writeBytes(hash.bytes, hash.offset, hash.length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), hash.length);
} else {
try (InputStream stream = maybeRateLimit(new SlicedInputStream(fileInfo.numberOfParts()) {
@Override
protected InputStream openSlice(long slice) throws IOException {
return container.readBlob(fileInfo.partName(slice));
}
}, restoreRateLimiter, restoreRateLimitingTimeInNanos)) {
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
}
}
}
Store.verify(indexOutput);
indexOutput.close();
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
success = true;
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
try {
store.markStoreCorrupted(ex);
} catch (IOException e) {
logger.warn("store cannot be marked as corrupted", e);
}
throw ex;
} finally {
if (success == false) {
store.deleteQuiet(fileInfo.physicalName());
}
}
}
}.restore(snapshotFiles, store, l);
Expand Down Expand Up @@ -1830,7 +1860,7 @@ private static List<String> unusedBlobs(Set<String> blobs, Set<String> surviving
|| (blob.startsWith(SNAPSHOT_PREFIX) && blob.endsWith(".dat")
&& survivingSnapshotUUIDs.contains(
blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length())) == false)
|| (blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null)
|| (blob.startsWith(UPLOADED_DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null)
|| FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList());
}

Expand Down Expand Up @@ -1884,7 +1914,7 @@ private Tuple<BlobStoreIndexShardSnapshots, Long> buildBlobStoreIndexShardSnapsh
final BlobStoreIndexShardSnapshots shardSnapshots = indexShardSnapshotsFormat.read(shardContainer, Long.toString(latest));
return new Tuple<>(shardSnapshots, latest);
} else if (blobs.stream().anyMatch(b -> b.startsWith(SNAPSHOT_PREFIX) || b.startsWith(INDEX_FILE_PREFIX)
|| b.startsWith(DATA_BLOB_PREFIX))) {
|| b.startsWith(UPLOADED_DATA_BLOB_PREFIX))) {
throw new IllegalStateException(
"Could not find a readable index-N file in a non-empty shard snapshot directory [" + shardContainer.path() + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1132,11 +1132,11 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException {

SnapshotStats stats = snapshots.get(0).getStats();

assertThat(stats.getTotalFileCount(), is(snapshot0FileCount));
assertThat(stats.getTotalSize(), is(snapshot0FileSize));
assertThat(stats.getTotalFileCount(), greaterThanOrEqualTo(snapshot0FileCount));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This simply doesn't match up now and I figured what really matters in these tests is the consistency of the numbers. I didn't want to adjust the results for the file counts to filter out the files that weren't physically uploaded since they are still uploaded as part of the metadata technically.
We do have a bunch of other tests that verify all the files are there and incrementality works as expected so I figured this adjustment is good enough.

assertThat(stats.getTotalSize(), greaterThanOrEqualTo(snapshot0FileSize));

assertThat(stats.getIncrementalFileCount(), equalTo(snapshot0FileCount));
assertThat(stats.getIncrementalSize(), equalTo(snapshot0FileSize));
assertThat(stats.getIncrementalFileCount(), equalTo(stats.getTotalFileCount()));
assertThat(stats.getIncrementalSize(), equalTo(stats.getTotalSize()));

assertThat(stats.getIncrementalFileCount(), equalTo(stats.getProcessedFileCount()));
assertThat(stats.getIncrementalSize(), equalTo(stats.getProcessedSize()));
Expand Down Expand Up @@ -1175,17 +1175,17 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException {
ArrayList<Path> snapshotFilesDiff = new ArrayList<>(snapshot1Files);
snapshotFilesDiff.removeAll(snapshot0Files);

assertThat(anotherStats.getIncrementalFileCount(), equalTo(snapshotFilesDiff.size()));
assertThat(anotherStats.getIncrementalSize(), equalTo(calculateTotalFilesSize(snapshotFilesDiff)));
assertThat(anotherStats.getIncrementalFileCount(), greaterThanOrEqualTo(snapshotFilesDiff.size()));
assertThat(anotherStats.getIncrementalSize(), greaterThanOrEqualTo(calculateTotalFilesSize(snapshotFilesDiff)));

assertThat(anotherStats.getIncrementalFileCount(), equalTo(anotherStats.getProcessedFileCount()));
assertThat(anotherStats.getIncrementalSize(), equalTo(anotherStats.getProcessedSize()));

assertThat(stats.getTotalSize(), lessThan(anotherStats.getTotalSize()));
assertThat(stats.getTotalFileCount(), lessThan(anotherStats.getTotalFileCount()));

assertThat(anotherStats.getTotalFileCount(), is(snapshot1FileCount));
assertThat(anotherStats.getTotalSize(), is(snapshot1FileSize));
assertThat(anotherStats.getTotalFileCount(), greaterThanOrEqualTo(snapshot1FileCount));
assertThat(anotherStats.getTotalSize(), greaterThanOrEqualTo(snapshot1FileSize));
}

public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,8 @@ public void testUnrestorableFilesDuringRestore() throws Exception {
final String indexName = "unrestorable-files";
final int maxRetries = randomIntBetween(1, 10);

Settings createIndexSettings = Settings.builder().put(SETTING_ALLOCATION_MAX_RETRY.getKey(), maxRetries).build();
Settings createIndexSettings = Settings.builder().put(SETTING_ALLOCATION_MAX_RETRY.getKey(), maxRetries)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little annoying but if we run this test with more than a single shard, then it might be that we get shards that aren't corrupted because they contain no documents now as well as no physical data files because only the .si and segments_N are uploaded for the empty shards.

.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).build();

Settings repositorySettings = Settings.builder()
.put("random", randomAlphaOfLength(10))
Expand Down