Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ public interface BlobContainer {
*/
InputStream readBlob(String blobName) throws IOException;

/**
* Creates a new {@link InputStream} that can be used to read the given blob starting from
* a specific {@code position} in the blob. The {@code length} is an indication of the
* number of bytes that are expected to be read from the {@link InputStream}.
*
* @param blobName The name of the blob to get an {@link InputStream} for.
* @param position The position in the blob where the next byte will be read.
* @param length An indication of the number of bytes to be read.
* @return The {@code InputStream} to read the blob.
* @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob can not be read.
*/
default InputStream readBlob(final String blobName, final long position, final int length) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we want length on this API? Wouldn't it be better to just have IndexInput keep a reference to an open stream and only open a new stream if we seek backwards instead of opening a new stream of bounded length repeatedly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be done like you suggest for FS repository (and maybe HDFS too) but for other repositories we need to give an indication of the number of bytes we want to download, because unlike the RestoreService we don't want to read all the blobs but only a chunk of it. Most SDK require to consume all the requested bytes (or will consume them under the hood for you) and we don't want to open a stream that reads a complete blob if we only use the first 28 bytes to read a header.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most SDK require to consume all the requested bytes (or will consume them under the hood for you) and we don't want to open a stream that reads a complete blob if we only use the first 28 bytes to read a header.

Fair point :) You have abort as a method on the S3 input stream though, not sure about GCS here.

throw new UnsupportedOperationException(); // NORELEASE
}

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name.
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ public InputStream readBlob(String name) throws IOException {
}
}

@Override
public InputStream readBlob(String blobName, long position, int length) throws IOException {
final InputStream inputStream = readBlob(blobName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could just use this as the default implementation in BlobContainer instead of throwing. I think all our streams support skip? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hacky and only less harmful for FS repository so I preferred to implement it in FS blob container and let the other implementation as not supported until the method is correctly implemented for each of them (using range of bytes download)

long skipped = inputStream.skip(position); // NORELEASE
assert skipped == position;
return inputStream;
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
if (failIfAlreadyExists == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.Repository;

/**
Expand Down Expand Up @@ -58,4 +59,13 @@ default Map<String, Repository.Factory> getInternalRepositories(Environment env,
ClusterService clusterService) {
return Collections.emptyMap();
}

/**
* Passes down the current {@link RepositoriesModule} to repository plugins.
*
* @param module the current {@link RepositoriesModule}
*/
default void onRepositoriesModule(RepositoriesModule module) {
// NORELEASE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ private BlobContainer shardContainer(IndexId indexId, ShardId shardId) {
return shardContainer(indexId, shardId.getId());
}

private BlobContainer shardContainer(IndexId indexId, int shardId) {
public BlobContainer shardContainer(IndexId indexId, int shardId) {
return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId)));
}

Expand Down Expand Up @@ -942,9 +942,11 @@ public long getRestoreThrottleTimeInNanos() {
}

protected void assertSnapshotOrGenericThread() {
// NORELEASE
/*
assert Thread.currentThread().getName().contains(ThreadPool.Names.SNAPSHOT)
|| Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC) :
"Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread.";
"Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread.";*/
}

@Override
Expand Down Expand Up @@ -1663,7 +1665,7 @@ private static List<String> unusedBlobs(Set<String> blobs, Set<String> surviving
/**
* Loads information about shard snapshot
*/
private BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
try {
return indexShardSnapshotFormat.read(shardContainer, snapshotId.getUUID());
} catch (NoSuchFileException ex) {
Expand Down
2 changes: 0 additions & 2 deletions x-pack/plugin/searchable-snapshots/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,3 @@ dependencies {
// installing them as individual plugins for integ tests doesn't make sense,
// so we disable integ tests
integTest.enabled = false

test.enabled = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.index.store;

import org.apache.lucene.store.BaseDirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.SingleInstanceLockFactory;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;

/**
* Implementation of {@link Directory} that exposes files from a snapshot as a Lucene directory. Because snapshot are immutable this
* implementation does not allow modification of the directory files and only supports {@link #listAll()}, {@link #fileLength(String)} and
* {@link #openInput(String, IOContext)} methods.
*
* To create a {@link SearchableSnapshotDirectory} both the list of the snapshot files and a {@link BlobContainer} to read these files must
* be provided. The definition of the snapshot files are provided using a {@link BlobStoreIndexShardSnapshot} object which contains the name
* of the snapshot and all the files it contains along with their metadata. Because there is no one-to-one relationship between the original
* shard files and what it stored in the snapshot the {@link BlobStoreIndexShardSnapshot} is used to map a physical file name as expected by
* Lucene with the one (or the ones) corresponding blob(s) in the snapshot.
*/
public class SearchableSnapshotDirectory extends BaseDirectory {

private final BlobStoreIndexShardSnapshot snapshot;
private final BlobContainer blobContainer;

public SearchableSnapshotDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobContainer blobContainer) {
super(new SingleInstanceLockFactory());
this.snapshot = Objects.requireNonNull(snapshot);
this.blobContainer = Objects.requireNonNull(blobContainer);
}

private FileInfo fileInfo(final String name) throws FileNotFoundException {
return snapshot.indexFiles().stream()
.filter(fileInfo -> fileInfo.physicalName().equals(name))
.findFirst()
.orElseThrow(() -> new FileNotFoundException(name));
}

@Override
public String[] listAll() throws IOException {
ensureOpen();
return snapshot.indexFiles().stream()
.map(FileInfo::physicalName)
.sorted(String::compareTo)
.toArray(String[]::new);
}

@Override
public long fileLength(final String name) throws IOException {
ensureOpen();
return fileInfo(name).length();
}

@Override
public IndexInput openInput(final String name, final IOContext context) throws IOException {
ensureOpen();
return new SearchableSnapshotIndexInput(blobContainer, fileInfo(name));
}

@Override
public void close() {
isOpen = false;
}

@Override
public String toString() {
return this.getClass().getSimpleName() + "@" + snapshot.snapshot() + " lockFactory=" + lockFactory;
}

@Override
public Set<String> getPendingDeletions() {
throw unsupportedException();
}

@Override
public void sync(Collection<String> names) {
throw unsupportedException();
}

@Override
public void syncMetaData() {
throw unsupportedException();
}

@Override
public void deleteFile(String name) {
throw unsupportedException();
}

@Override
public IndexOutput createOutput(String name, IOContext context) {
throw unsupportedException();
}

@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) {
throw unsupportedException();
}

@Override
public void rename(String source, String dest) {
throw unsupportedException();
}

private static UnsupportedOperationException unsupportedException() {
return new UnsupportedOperationException("Searchable snapshot directory does not support this operation");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.index.store;

import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;

/**
* A {@link SearchableSnapshotIndexInput} instance corresponds to a single file from a Lucene directory that has been snapshotted. Because
* large Lucene file might be split into multiple parts during the snapshot, {@link SearchableSnapshotIndexInput} requires a
* {@link FileInfo} object at creation time. This object is used to retrieve the file name and length of the original Lucene file, as well
* as all the parts (stored as "blobs" in the repository) that composed the file in the snapshot.
*
* For example, the following {@link FileInfo}:
* [name: __4vdpz_HFQ8CuKjCERX0o2A, numberOfParts: 2, partSize: 997b, partBytes: 997, metadata: name [_0_Asserting_0.pos], length [1413]
*
* Indicates that the Lucene file "_0_Asserting_0.pos" has a total length of 1413 and is snapshotted into 2 parts:
* - __4vdpz_HFQ8CuKjCERX0o2A.part1 of size 997b
* - __4vdpz_HFQ8CuKjCERX0o2A.part2 of size 416b
*
* {@link SearchableSnapshotIndexInput} maintains a global position that indicates the current position in the Lucene file where the
* next read will occur. In the case of a Lucene file snapshotted into multiple parts, this position is used to identify which part must
* be read at which position (see {@link #readInternal(byte[], int, int)}. This position is also passed over to cloned and sliced input
* along with the {@link FileInfo} so that they can also track their reading position.
*/
public class SearchableSnapshotIndexInput extends BufferedIndexInput {

private final BlobContainer blobContainer;
private final FileInfo fileInfo;
private final long offset;
private final long length;

private long position;
private boolean closed;

public SearchableSnapshotIndexInput(final BlobContainer blobContainer, final FileInfo fileInfo) {
this("SearchableSnapshotIndexInput(" + fileInfo.physicalName() + ")", blobContainer, fileInfo, 0L, 0L, fileInfo.length());
}

private SearchableSnapshotIndexInput(final String resourceDesc, final BlobContainer blobContainer,
final FileInfo fileInfo, final long position, final long offset, final long length) {
super(resourceDesc);
this.blobContainer = Objects.requireNonNull(blobContainer);
this.fileInfo = Objects.requireNonNull(fileInfo);
this.offset = offset;
this.length = length;
this.position = position;
this.closed = false;
}

@Override
public long length() {
return length;
}

private void ensureOpen() throws IOException {
if (closed) {
throw new IOException(toString() + " is closed");
}
}

@Override
protected void readInternal(byte[] b, int offset, int length) throws IOException {
ensureOpen();
if (fileInfo.numberOfParts() == 1L) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not reuse the logic from the restore codebase that has a sliced input stream already here instead of building the same thing again? (also see my comment on keeping a reference to a stream open until we're seeking backwards).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good suggestion, we should be able to use SlicedInputStream combined with the length parameter. I'll take a look :)

readInternalBytes(0L, position, b, offset, length);
} else {
int len = length;
int off = offset;
while (len > 0) {
long currentPart = position / fileInfo.partSize().getBytes();
int remainingBytesInPart;
if (currentPart < (fileInfo.numberOfParts() - 1)) {
remainingBytesInPart = Math.toIntExact(((currentPart + 1L) * fileInfo.partSize().getBytes()) - position);
} else {
remainingBytesInPart = Math.toIntExact(fileInfo.length() - position);
}
final int read = Math.min(len, remainingBytesInPart);
readInternalBytes(currentPart, position % fileInfo.partSize().getBytes(), b, off, read);
len -= read;
off += read;
}
}
}

private void readInternalBytes(final long part, final long pos, byte[] b, int offset, int length) throws IOException {
try (InputStream inputStream = blobContainer.readBlob(fileInfo.partName(part), pos, length)) {
int read = inputStream.read(b, offset, length);
assert read == length;
position += read;
}
}

@Override
protected void seekInternal(long pos) throws IOException {
if (pos > length) {
throw new EOFException("Reading past end of file [position=" + pos + ", length=" + length + "] for " + toString());
} else if (pos < 0L) {
throw new IOException("Seeking to negative position [" + pos + "] for " + toString());
}
this.position = offset + pos;
}

@Override
public BufferedIndexInput clone() {
return new SearchableSnapshotIndexInput("clone(" + this + ")", blobContainer, fileInfo, position, offset, length);
}

@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) {
final SearchableSnapshotIndexInput slice =
new SearchableSnapshotIndexInput(sliceDescription, blobContainer, fileInfo, position, this.offset + offset, length);
slice.seek(0L);
return slice;
} else {
throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset
+ ",length=" + length + ",fileLength=" + length() + ": " + this);
}
}

@Override
public void close() throws IOException {
closed = true;
}

@Override
public String toString() {
return "SearchableSnapshotIndexInput{" +
"resourceDesc=" + super.toString() +
", fileInfo=" + fileInfo +
", offset=" + offset +
", length=" + length +
", position=" + position +
'}';
}
}
Loading