Skip to content

Commit e2438cb

Browse files
committed
few adjustments
1 parent 5b3960d commit e2438cb

File tree

11 files changed

+248
-155
lines changed

11 files changed

+248
-155
lines changed

server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,22 @@ public interface BlobContainer {
4949
*/
5050
InputStream readBlob(String blobName) throws IOException;
5151

52+
/**
53+
* Creates a new {@link InputStream} that can be used to read the given blob starting from
54+
* a specific {@code position} in the blob. The {@code length} is an indication of the
55+
* number of bytes that are expected to be read from the {@link InputStream}.
56+
*
57+
* @param blobName The name of the blob to get an {@link InputStream} for.
58+
* @param position The position in the blob where the next byte will be read.
59+
* @param length An indication of the number of bytes to be read.
60+
* @return The {@code InputStream} to read the blob.
61+
* @throws NoSuchFileException if the blob does not exist
62+
* @throws IOException if the blob can not be read.
63+
*/
64+
default InputStream readBlob(final String blobName, final long position, final int length) throws IOException {
65+
throw new UnsupportedOperationException(); // NORELEASE
66+
}
67+
5268
/**
5369
* Reads blob content from the input stream and writes it to the container in a new blob with the given name.
5470
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the

server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,14 @@ public InputStream readBlob(String name) throws IOException {
163163
}
164164
}
165165

166+
@Override
167+
public InputStream readBlob(String blobName, long position, int length) throws IOException {
168+
final InputStream inputStream = readBlob(blobName);
169+
long skipped = inputStream.skip(position); // NORELEASE
170+
assert skipped == position;
171+
return inputStream;
172+
}
173+
166174
@Override
167175
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
168176
if (failIfAlreadyExists == false) {

server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.cluster.service.ClusterService;
2626
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2727
import org.elasticsearch.env.Environment;
28+
import org.elasticsearch.repositories.RepositoriesModule;
2829
import org.elasticsearch.repositories.Repository;
2930

3031
/**
@@ -58,4 +59,13 @@ default Map<String, Repository.Factory> getInternalRepositories(Environment env,
5859
ClusterService clusterService) {
5960
return Collections.emptyMap();
6061
}
62+
63+
/**
64+
* Passes down the current {@link RepositoriesModule} to repository plugins.
65+
*
66+
* @param module the current {@link RepositoriesModule}
67+
*/
68+
default void onRepositoriesModule(RepositoriesModule module) {
69+
// NORELEASE
70+
}
6171
}

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,7 @@ private BlobContainer shardContainer(IndexId indexId, ShardId shardId) {
910910
return shardContainer(indexId, shardId.getId());
911911
}
912912

913-
private BlobContainer shardContainer(IndexId indexId, int shardId) {
913+
public BlobContainer shardContainer(IndexId indexId, int shardId) {
914914
return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId)));
915915
}
916916

@@ -942,9 +942,11 @@ public long getRestoreThrottleTimeInNanos() {
942942
}
943943

944944
protected void assertSnapshotOrGenericThread() {
945+
// NORELEASE
946+
/*
945947
assert Thread.currentThread().getName().contains(ThreadPool.Names.SNAPSHOT)
946948
|| Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC) :
947-
"Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread.";
949+
"Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread.";*/
948950
}
949951

950952
@Override
@@ -1663,7 +1665,7 @@ private static List<String> unusedBlobs(Set<String> blobs, Set<String> surviving
16631665
/**
16641666
* Loads information about shard snapshot
16651667
*/
1666-
private BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
1668+
public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
16671669
try {
16681670
return indexShardSnapshotFormat.read(shardContainer, snapshotId.getUUID());
16691671
} catch (NoSuchFileException ex) {

x-pack/plugin/searchable-snapshots/build.gradle

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,3 @@ dependencies {
1818
// installing them as individual plugins for integ tests doesn't make sense,
1919
// so we disable integ tests
2020
integTest.enabled = false
21-
22-
test.enabled = false

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BlobBytesReader.java

Lines changed: 0 additions & 13 deletions
This file was deleted.

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.lucene.store.IndexInput;
1212
import org.apache.lucene.store.IndexOutput;
1313
import org.apache.lucene.store.SingleInstanceLockFactory;
14+
import org.elasticsearch.common.blobstore.BlobContainer;
1415
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
1516
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
1617

@@ -25,21 +26,21 @@
2526
* implementation does not allow modification of the directory files and only supports {@link #listAll()}, {@link #fileLength(String)} and
2627
* {@link #openInput(String, IOContext)} methods.
2728
*
28-
* To create a {@link SearchableSnapshotDirectory} both the list of the snapshot files and a way to read these files must be provided. The
29-
* definition of the snapshot files are provided using a {@link BlobStoreIndexShardSnapshot} object which contains the name of the snapshot
30-
* and all the files it contains along with their metadata. Because there is no one-to-one relationship between the original shard files
31-
* and what it stored in the snapshot the {@link BlobStoreIndexShardSnapshot} is used to map a physical file name as expected by Lucene with
32-
* the one (or the ones) corresponding blob(s) in the snapshot.
29+
* To create a {@link SearchableSnapshotDirectory} both the list of the snapshot files and a {@link BlobContainer} to read these files must
30+
* be provided. The definition of the snapshot files are provided using a {@link BlobStoreIndexShardSnapshot} object which contains the name
31+
* of the snapshot and all the files it contains along with their metadata. Because there is no one-to-one relationship between the original
32+
* shard files and what it stored in the snapshot the {@link BlobStoreIndexShardSnapshot} is used to map a physical file name as expected by
33+
* Lucene with the one (or the ones) corresponding blob(s) in the snapshot.
3334
*/
3435
public class SearchableSnapshotDirectory extends BaseDirectory {
3536

3637
private final BlobStoreIndexShardSnapshot snapshot;
37-
private final BlobBytesReader reader;
38+
private final BlobContainer blobContainer;
3839

39-
protected SearchableSnapshotDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobBytesReader reader) {
40+
public SearchableSnapshotDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobContainer blobContainer) {
4041
super(new SingleInstanceLockFactory());
4142
this.snapshot = Objects.requireNonNull(snapshot);
42-
this.reader = Objects.requireNonNull(reader);
43+
this.blobContainer = Objects.requireNonNull(blobContainer);
4344
}
4445

4546
private FileInfo fileInfo(final String name) throws FileNotFoundException {
@@ -67,7 +68,7 @@ public long fileLength(final String name) throws IOException {
6768
@Override
6869
public IndexInput openInput(final String name, final IOContext context) throws IOException {
6970
ensureOpen();
70-
return new SearchableSnapshotIndexInput(reader, fileInfo(name));
71+
return new SearchableSnapshotIndexInput(blobContainer, fileInfo(name));
7172
}
7273

7374
@Override

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotIndexInput.java

Lines changed: 31 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77

88
import org.apache.lucene.store.BufferedIndexInput;
99
import org.apache.lucene.store.IndexInput;
10+
import org.elasticsearch.common.blobstore.BlobContainer;
1011
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
1112

1213
import java.io.EOFException;
1314
import java.io.IOException;
15+
import java.io.InputStream;
1416
import java.util.Objects;
1517

1618
/**
@@ -33,28 +35,32 @@
3335
*/
3436
public class SearchableSnapshotIndexInput extends BufferedIndexInput {
3537

36-
private final BlobBytesReader reader;
38+
private final BlobContainer blobContainer;
3739
private final FileInfo fileInfo;
40+
private final long offset;
41+
private final long length;
3842

3943
private long position;
4044
private boolean closed;
4145

42-
public SearchableSnapshotIndexInput(final BlobBytesReader reader, final FileInfo fileInfo) {
43-
this("SearchableSnapshotIndexInput(" + fileInfo + ")", reader, fileInfo, 0L);
46+
public SearchableSnapshotIndexInput(final BlobContainer blobContainer, final FileInfo fileInfo) {
47+
this("SearchableSnapshotIndexInput(" + fileInfo.physicalName() + ")", blobContainer, fileInfo, 0L, 0L, fileInfo.length());
4448
}
4549

46-
private SearchableSnapshotIndexInput(final String resourceDesc, final BlobBytesReader reader,
47-
final FileInfo fileInfo, final long position) {
50+
private SearchableSnapshotIndexInput(final String resourceDesc, final BlobContainer blobContainer,
51+
final FileInfo fileInfo, final long position, final long offset, final long length) {
4852
super(resourceDesc);
49-
this.reader = Objects.requireNonNull(reader);
53+
this.blobContainer = Objects.requireNonNull(blobContainer);
5054
this.fileInfo = Objects.requireNonNull(fileInfo);
55+
this.offset = offset;
56+
this.length = length;
5157
this.position = position;
5258
this.closed = false;
5359
}
5460

5561
@Override
5662
public long length() {
57-
return fileInfo.length();
63+
return length;
5864
}
5965

6066
private void ensureOpen() throws IOException {
@@ -88,29 +94,33 @@ protected void readInternal(byte[] b, int offset, int length) throws IOException
8894
}
8995

9096
private void readInternalBytes(final long part, final long pos, byte[] b, int offset, int length) throws IOException {
91-
reader.readBlobBytes(fileInfo.partName(part), pos, length, b, offset);
92-
position += length;
97+
try (InputStream inputStream = blobContainer.readBlob(fileInfo.partName(part), pos, length)) {
98+
int read = inputStream.read(b, offset, length);
99+
assert read == length;
100+
position += read;
101+
}
93102
}
94103

95104
@Override
96105
protected void seekInternal(long pos) throws IOException {
97-
if (pos > fileInfo.length()) {
98-
throw new EOFException("Reading past end of file [position=" + pos + ", length=" + fileInfo.length() + "] for " + toString());
106+
if (pos > length) {
107+
throw new EOFException("Reading past end of file [position=" + pos + ", length=" + length + "] for " + toString());
99108
} else if (pos < 0L) {
100109
throw new IOException("Seeking to negative position [" + pos + "] for " + toString());
101110
}
102-
this.position = pos;
111+
this.position = offset + pos;
103112
}
104113

105114
@Override
106115
public BufferedIndexInput clone() {
107-
return new SearchableSnapshotIndexInput("clone(" + this + ")", reader, fileInfo, position);
116+
return new SearchableSnapshotIndexInput("clone(" + this + ")", blobContainer, fileInfo, position, offset, length);
108117
}
109118

110119
@Override
111120
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
112121
if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) {
113-
final Slice slice = new Slice(sliceDescription, offset, length, this);
122+
final SearchableSnapshotIndexInput slice =
123+
new SearchableSnapshotIndexInput(sliceDescription, blobContainer, fileInfo, position, this.offset + offset, length);
114124
slice.seek(0L);
115125
return slice;
116126
} else {
@@ -126,57 +136,12 @@ public void close() throws IOException {
126136

127137
@Override
128138
public String toString() {
129-
return getClass().getSimpleName()
130-
+ "(resourceDesc=" + super.toString()
131-
+ ", name=" + fileInfo.physicalName()
132-
+ ", length=" + fileInfo.length()
133-
+ ", sizeOfParts=" + fileInfo.partSize()
134-
+ ", numberOfParts=" + fileInfo.numberOfParts() + ")";
135-
}
136-
137-
/**
138-
* A slice created from a {@link SearchableSnapshotIndexInput}.
139-
*
140-
* The slice overrides the {@link #length()} and {@link #seekInternal(long)}
141-
* methods so that it adjust the values according to initial offset position
142-
* from which the slice was created.
143-
*/
144-
private static class Slice extends SearchableSnapshotIndexInput {
145-
146-
private final long offset;
147-
private final long length;
148-
149-
Slice(String sliceDescription, long offset, long length, SearchableSnapshotIndexInput base) {
150-
super(base.toString() + " [slice=" + sliceDescription + "]", base.reader, base.fileInfo, base.position);
151-
this.offset = offset;
152-
this.length = length;
153-
}
154-
155-
@Override
156-
public long length() {
157-
return length;
158-
}
159-
160-
@Override
161-
protected void seekInternal(long pos) throws IOException {
162-
super.seekInternal(offset + pos);
163-
}
164-
165-
@Override
166-
public BufferedIndexInput clone() {
167-
return new Slice("clone(" + this + ")", offset, length, this);
168-
}
169-
170-
@Override
171-
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
172-
if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) {
173-
final Slice slice = new Slice(sliceDescription, offset + this.offset, length, this);
174-
slice.seek(0L);
175-
return slice;
176-
} else {
177-
throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset
178-
+ ",length=" + length + ",fileLength=" + length() + ": " + this);
179-
}
180-
}
139+
return "SearchableSnapshotIndexInput{" +
140+
"resourceDesc=" + super.toString() +
141+
", fileInfo=" + fileInfo +
142+
", offset=" + offset +
143+
", length=" + length +
144+
", position=" + position +
145+
'}';
181146
}
182147
}

0 commit comments

Comments
 (0)