Skip to content

Commit 816b287

Browse files
committed
Prototype for snapshot blob cache system index
1 parent dbc5c26 commit 816b287

File tree

14 files changed

+1538
-34
lines changed

14 files changed

+1538
-34
lines changed

server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.Nullable;
2323

2424
import java.util.ArrayList;
25+
import java.util.Arrays;
2526
import java.util.Collections;
2627
import java.util.Iterator;
2728
import java.util.List;
@@ -39,6 +40,10 @@ public BlobPath() {
3940
this.paths = Collections.emptyList();
4041
}
4142

43+
public BlobPath(String path) {
44+
this.paths = path == null || path.isEmpty() ? Collections.emptyList() : Arrays.asList(path.split(SEPARATOR));
45+
}
46+
4247
public static BlobPath cleanPath() {
4348
return new BlobPath();
4449
}

server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333

3434
public abstract class FilterBlobContainer implements BlobContainer {
3535

36-
private final BlobContainer delegate;
36+
protected final BlobContainer delegate;
3737

3838
public FilterBlobContainer(BlobContainer delegate) {
3939
this.delegate = Objects.requireNonNull(delegate);

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
178178

179179
private static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s";
180180

181-
private static final String UPLOADED_DATA_BLOB_PREFIX = "__";
181+
public static final String UPLOADED_DATA_BLOB_PREFIX = "__";
182182

183183
/**
184184
* Prefix used for the identifiers of data blobs that were not actually written to the repository physically because their contents are

test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,6 +1223,7 @@ protected static boolean isXPackTemplate(String name) {
12231223
case "metrics":
12241224
case "metrics-settings":
12251225
case "metrics-mappings":
1226+
case ".snapshot-blob-cache":
12261227
return true;
12271228
default:
12281229
return false;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public static Map<String, String> filterSecurityHeaders(Map<String, String> head
6969
public static final String ASYNC_SEARCH_ORIGIN = "async_search";
7070
public static final String IDP_ORIGIN = "idp";
7171
public static final String STACK_ORIGIN = "stack";
72+
public static final String SEARCHABLE_SNAPSHOTS_ORIGIN = "searchable_snapshots";
7273

7374
private ClientHelper() {}
7475

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,6 @@ public static boolean isSearchableSnapshotStore(Settings indexSettings) {
4040

4141
public static final String CACHE_PREWARMING_THREAD_POOL_NAME = "searchable_snapshots_cache_prewarming";
4242
public static final String CACHE_PREWARMING_THREAD_POOL_SETTING = "xpack.searchable_snapshots.cache_prewarming_thread_pool";
43+
44+
public static final String SNAPSHOT_BLOB_CACHE_INDEX = ".snapshot-blob-cache";
4345
}
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.blobstore.cache;
8+
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
11+
import org.apache.logging.log4j.message.ParameterizedMessage;
12+
import org.elasticsearch.ExceptionsHelper;
13+
import org.elasticsearch.ResourceAlreadyExistsException;
14+
import org.elasticsearch.Version;
15+
import org.elasticsearch.action.ActionListener;
16+
import org.elasticsearch.action.NoShardAvailableActionException;
17+
import org.elasticsearch.action.get.GetRequest;
18+
import org.elasticsearch.action.index.IndexRequest;
19+
import org.elasticsearch.action.support.PlainActionFuture;
20+
import org.elasticsearch.client.Client;
21+
import org.elasticsearch.client.OriginSettingClient;
22+
import org.elasticsearch.cluster.metadata.IndexMetadata;
23+
import org.elasticsearch.cluster.service.ClusterService;
24+
import org.elasticsearch.common.blobstore.BlobPath;
25+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
26+
import org.elasticsearch.common.settings.Settings;
27+
import org.elasticsearch.common.unit.TimeValue;
28+
import org.elasticsearch.common.xcontent.ToXContent;
29+
import org.elasticsearch.common.xcontent.XContentBuilder;
30+
import org.elasticsearch.core.internal.io.IOUtils;
31+
import org.elasticsearch.index.IndexNotFoundException;
32+
import org.elasticsearch.rest.RestStatus;
33+
import org.elasticsearch.threadpool.ThreadPool;
34+
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
35+
36+
import java.io.IOException;
37+
import java.time.Instant;
38+
39+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
40+
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
41+
import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN;
42+
43+
public class BlobStoreCacheService {
44+
45+
private static final Logger logger = LogManager.getLogger(BlobStoreCacheService.class);
46+
47+
private static final TimeValue TIMEOUT = TimeValue.timeValueSeconds(30L);
48+
49+
private final ClusterService clusterService;
50+
protected final ThreadPool threadPool;
51+
private final Client client;
52+
private final String index;
53+
54+
public BlobStoreCacheService(ClusterService clusterService, ThreadPool threadPool, Client client, String index) {
55+
this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN);
56+
this.clusterService = clusterService;
57+
this.threadPool = threadPool;
58+
this.index = index;
59+
}
60+
61+
private void createIndexIfNecessary(ActionListener<String> listener) {
62+
if (clusterService.state().routingTable().hasIndex(index)) {
63+
listener.onResponse(index);
64+
return;
65+
}
66+
try {
67+
client.admin()
68+
.indices()
69+
.prepareCreate(index)
70+
.setSettings(settings())
71+
.setMapping(mappings())
72+
.execute(ActionListener.wrap(success -> listener.onResponse(index), e -> {
73+
if (e instanceof ResourceAlreadyExistsException
74+
|| ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
75+
listener.onResponse(index);
76+
} else {
77+
listener.onFailure(e);
78+
}
79+
}));
80+
} catch (Exception e) {
81+
listener.onFailure(e);
82+
}
83+
}
84+
85+
public CachedBlob get(String repository, String blobName, BlobPath blobPath, long offset) {
86+
final PlainActionFuture<CachedBlob> future = PlainActionFuture.newFuture();
87+
get(repository, blobName, blobPath, offset, future);
88+
return future.actionGet();
89+
}
90+
91+
public void get(String repository, String blobName, BlobPath blobPath, long position, ActionListener<CachedBlob> listener) {
92+
assert assertCurrentThread();
93+
createIndexIfNecessary(ActionListener.wrap(indexName -> {
94+
try {
95+
final GetRequest request = new GetRequest(index).id(CachedBlob.computeId(repository, blobName, blobPath, position));
96+
client.get(request, ActionListener.wrap(response -> {
97+
if (response.isExists()) {
98+
assert response.isSourceEmpty() == false;
99+
logger.trace("document [{}] found in cache", request);
100+
listener.onResponse(CachedBlob.fromSource(response.getSource()));
101+
// TODO trigger an update of last accessed time ?
102+
} else {
103+
logger.trace("document [{}] not found in cache", request);
104+
listener.onResponse(null);
105+
}
106+
}, e -> {
107+
if (e instanceof IndexNotFoundException || e instanceof NoShardAvailableActionException) {
108+
// Blob store cache system index might not be available at that time,
109+
// so we pretend we didn't find a cache entry and we move on.
110+
//
111+
// Failing here would bubble up the exception and fail the searchable
112+
// snapshot shard which is potentially recovering.
113+
//
114+
listener.onResponse(null);
115+
} else {
116+
listener.onFailure(e);
117+
}
118+
}));
119+
} catch (Exception e) {
120+
listener.onFailure(e);
121+
}
122+
123+
}, listener::onFailure));
124+
}
125+
126+
public void put(String repository, String blobName, BlobPath blobPath, ReleasableBytesReference content, long offset) {
127+
createIndexIfNecessary(new ActionListener<>() {
128+
@Override
129+
public void onResponse(String indexName) {
130+
final Instant now = Instant.ofEpochMilli(threadPool.absoluteTimeInMillis());
131+
try {
132+
final IndexRequest request = createIndexRequest(
133+
new CachedBlob(now, now, Version.CURRENT, repository, blobName, blobPath, content, offset)
134+
);
135+
client.index(request, ActionListener.runAfter(ActionListener.wrap(response -> {
136+
if (response.status() == RestStatus.CREATED) {
137+
logger.trace("document [{}] successfully indexed in [{}]", request, indexName);
138+
}
139+
},
140+
e -> logger.error(
141+
() -> new ParameterizedMessage("failed to index document [{}] in [{}] system index", request, indexName),
142+
e
143+
)
144+
), () -> IOUtils.closeWhileHandlingException(content)));
145+
} catch (IOException e) {
146+
logger.warn(() -> new ParameterizedMessage("failed to index document in [{}] system index", indexName), e);
147+
IOUtils.closeWhileHandlingException(content);
148+
}
149+
}
150+
151+
@Override
152+
public void onFailure(Exception e) {
153+
logger.error(() -> new ParameterizedMessage("failed to create [{}] system index", index), e);
154+
IOUtils.closeWhileHandlingException(content);
155+
}
156+
});
157+
}
158+
159+
private static Settings settings() {
160+
return Settings.builder()
161+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
162+
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all")
163+
.put(IndexMetadata.SETTING_PRIORITY, "900")
164+
.build();
165+
}
166+
167+
private static XContentBuilder mappings() throws IOException {
168+
final XContentBuilder builder = jsonBuilder();
169+
{
170+
builder.startObject();
171+
{
172+
builder.startObject(SINGLE_MAPPING_NAME);
173+
builder.field("dynamic", "false");
174+
{
175+
builder.startObject("_meta");
176+
builder.field("version", Version.CURRENT);
177+
builder.endObject();
178+
}
179+
{
180+
builder.startObject("properties");
181+
{
182+
builder.startObject("type");
183+
builder.field("type", "keyword");
184+
builder.endObject();
185+
}
186+
{
187+
builder.startObject("creation_time");
188+
builder.field("type", "date");
189+
builder.field("format", "epoch_millis");
190+
builder.endObject();
191+
}
192+
{
193+
builder.startObject("accessed_time");
194+
builder.field("type", "date");
195+
builder.field("format", "epoch_millis");
196+
builder.endObject();
197+
}
198+
{
199+
builder.startObject("version");
200+
builder.field("type", "integer");
201+
builder.endObject();
202+
}
203+
{
204+
builder.startObject("repository");
205+
builder.field("type", "keyword");
206+
builder.endObject();
207+
}
208+
{
209+
builder.startObject("blob");
210+
builder.field("type", "object");
211+
{
212+
builder.startObject("properties");
213+
{
214+
builder.startObject("name");
215+
builder.field("type", "keyword");
216+
builder.endObject();
217+
builder.startObject("path");
218+
builder.field("type", "keyword");
219+
builder.endObject();
220+
}
221+
builder.endObject();
222+
}
223+
builder.endObject();
224+
}
225+
{
226+
builder.startObject("data");
227+
builder.field("type", "object");
228+
{
229+
builder.startObject("properties");
230+
{
231+
builder.startObject("content");
232+
builder.field("type", "binary");
233+
builder.endObject();
234+
}
235+
{
236+
builder.startObject("length");
237+
builder.field("type", "long");
238+
builder.endObject();
239+
}
240+
{
241+
builder.startObject("offset");
242+
builder.field("type", "long");
243+
builder.endObject();
244+
}
245+
builder.endObject();
246+
}
247+
builder.endObject();
248+
}
249+
builder.endObject();
250+
}
251+
builder.endObject();
252+
}
253+
builder.endObject();
254+
}
255+
return builder;
256+
}
257+
258+
private IndexRequest createIndexRequest(CachedBlob cachedBlob) throws IOException {
259+
final IndexRequest indexRequest = new IndexRequest(index);
260+
indexRequest.id(cachedBlob.id());
261+
try (XContentBuilder xContentBuilder = jsonBuilder()) {
262+
indexRequest.source(cachedBlob.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS));
263+
}
264+
return indexRequest;
265+
}
266+
267+
private boolean assertCurrentThread() {
268+
final String threadName = Thread.currentThread().getName();
269+
assert threadName.contains('[' + ThreadPool.Names.GENERIC + ']')
270+
|| threadName.contains('[' + ThreadPool.Names.SNAPSHOT + ']')
271+
|| threadName.contains('[' + SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME + ']') : "Unexpected thread name "
272+
+ threadName;
273+
return true;
274+
}
275+
}

0 commit comments

Comments
 (0)