-
Notifications
You must be signed in to change notification settings - Fork 25.8k
Shadow replicas on shared filesystems #9727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 53 commits
05975af
38135af
7fcb373
be02cab
343dc0b
2a2eed1
24d36c9
2d42736
ca9beb2
67d7df4
1896fed
a95adbe
52e9cd1
2378fbb
5e33eea
80cf0e8
e4dbfb0
06e2eb4
fdbe413
5689b7d
cf2fb80
4a367c0
f229719
abda780
a62b9a7
a7eb53c
d8d59db
28a9d18
4f71c8d
ea4e3e5
aea9692
83d171c
213292e
e5bc047
740c28d
2ae80f9
c4c999d
28f069b
82b9f04
abb6965
d77414c
48a700d
e8ad614
77fba57
11886b7
744f228
45cd34a
c5ece6d
eb699c1
4473e63
b9d1fed
0b1b852
a1b8b8c
b64fef1
23001af
d90d698
7346f9f
60a4d53
c8e8db4
73c62df
1a0d456
62b0c28
67a797a
edd4943
325acbe
2083503
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| [[indices-shadow-replicas]] | ||
| == Shadow replica indices | ||
|
|
||
| If you would like to use a shared filesystem, you can use the shadow replicas | ||
| settings to choose where on disk the data for an index should be kept, as well | ||
| as how Elasticsearch should replay operations on all the replica shards of an | ||
| index. | ||
|
|
||
| In order to fully utilize the `index.data_path` and `index.shadow_replicas` | ||
| settings, you need to enable using it in elasticsearch.yml: | ||
|
|
||
| [source,yaml] | ||
| -------------------------------------------------- | ||
| node.enable_custom_paths: true | ||
| -------------------------------------------------- | ||
|
|
||
| You can then create an index with a custom data path, where each node will use | ||
| this path for the data: | ||
|
|
||
| [source,js] | ||
| -------------------------------------------------- | ||
| curl -XPUT 'localhost:9200/my_index' -d ' | ||
| { | ||
| "index" : { | ||
| "number_of_shards" : 1, | ||
| "number_of_replicas" : 4, | ||
| "data_path": "/var/data/my_index", | ||
| "shadow_replicas": true | ||
| } | ||
| }' | ||
| -------------------------------------------------- | ||
|
|
||
| [WARNING] | ||
| ======================== | ||
| In the above example, the "/var/data/my_index" path is a shared filesystem that | ||
| must be available on every node in the Elasticsearch cluster. You must also | ||
| ensure that the Elasticsearch process has the correct permissions to read from | ||
| and write to the directory used in the `index.data_path` setting. | ||
| ======================== | ||
|
|
||
| An index that has been created with the `index.shadow_replicas` setting set to | ||
| "true" will not replicate document operations to any of the replica shards, | ||
| instead, it will only continually refresh. Once segments are available on the | ||
| filesystem where the shadow replica resides (after an Elasticsearch "flush"), a | ||
| regular refresh (governed by the `index.refresh_interval`) can be used to make | ||
| the new data searchable. | ||
|
|
||
| In order to ensure the data is being synchronized in a fast enough manner, you | ||
| may need to tune the flush threshold for the index to a desired number. A flush | ||
| is needed to fsync segment files to disk, so they will be visible to all other | ||
| replica nodes. Users should test what flush threshold levels they are | ||
| comfortable with, as increased flushing can impact indexing performance. | ||
|
|
||
| The Elasticsearch cluster will still detect the loss of a primary shard, and | ||
| transform the replica into a primary in this situation. This transformation will | ||
| take slightly longer, since no `IndexWriter` is maintained for each shadow | ||
| replica. | ||
|
|
||
| Below is the list of settings that can be changed using the update | ||
| settings API: | ||
|
|
||
| `index.data_path` (string):: | ||
| Path to use for the index's data. Note that by default Elasticsearch will | ||
| append the node ordinal by default to the path to ensure multiple instances | ||
| of Elasticsearch on the same machine do not share a data directory. | ||
|
|
||
| `index.shadow_replicas`:: | ||
| Boolean value indicating this index should use shadow replicas. Defaults to | ||
| `false`. | ||
|
|
||
| `index.shared_filesystem`:: | ||
| Boolean value indicating this index uses a shared filesystem. Defaults to | ||
| the `true` if `index.shadow_replicas` is set to true, `false` otherwise. | ||
|
|
||
| === Node level settings related to shadow replicas | ||
|
|
||
| These are non-dynamic settings that need to be configured in `elasticsearch.yml` | ||
|
|
||
| `node.add_id_to_custom_path`:: | ||
| Boolean setting indicating whether Elasticsearch should append the node's | ||
| ordinal to the custom data path. For example, if this is enabled and a path | ||
| of "/tmp/foo" is used, the first locally-running node will use "/tmp/foo/0", | ||
| the second will use "/tmp/foo/1", the third "/tmp/foo/2", etc. Defaults to | ||
| `true`. | ||
|
|
||
| `node.enable_custom_paths`:: | ||
| Boolean value that must be set to `true` in order to use the | ||
| `index.data_path` setting. Defaults to `false`. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,11 +45,11 @@ | |
| import org.elasticsearch.common.unit.TimeValue; | ||
| import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||
| import org.elasticsearch.common.util.concurrent.ConcurrentCollections; | ||
| import org.elasticsearch.index.IndexService; | ||
| import org.elasticsearch.index.engine.DocumentAlreadyExistsException; | ||
| import org.elasticsearch.index.engine.VersionConflictEngineException; | ||
| import org.elasticsearch.index.IndexService; | ||
| import org.elasticsearch.index.shard.ShardId; | ||
| import org.elasticsearch.index.shard.IndexShard; | ||
| import org.elasticsearch.index.shard.ShardId; | ||
| import org.elasticsearch.indices.IndicesService; | ||
| import org.elasticsearch.node.NodeClosedException; | ||
| import org.elasticsearch.rest.RestStatus; | ||
|
|
@@ -631,9 +631,22 @@ void performOnReplica(final ReplicationState state, final ShardRouting shard, fi | |
| } | ||
|
|
||
| final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), state.replicaRequest()); | ||
|
|
||
| // If the replicas use shadow replicas, there is no reason to | ||
| // perform the action on the replica, so skip it and | ||
| // immediately return | ||
| if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings())) { | ||
| // this doesn't replicate mappings changes, so can fail if mappings are not predefined | ||
| // It was successful on the replica, although we never actually executed - in the future we will | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would clarify this statement? cause mapping updates do get replicated, it just takes longer since it needs to head to the master and then published to the replicas, so there is a delay in mapping introduction.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will clarify this comment |
||
| // ack mapping updates with the master and that will sync with replicas. For now this is just fine. | ||
| state.onReplicaSuccess(); | ||
| return; | ||
| } | ||
|
|
||
| if (!nodeId.equals(observer.observedState().nodes().localNodeId())) { | ||
| final DiscoveryNode node = observer.observedState().nodes().get(nodeId); | ||
| transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { | ||
| transportService.sendRequest(node, transportReplicaAction, shardRequest, | ||
| transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { | ||
| @Override | ||
| public void handleResponse(TransportResponse.Empty vResponse) { | ||
| state.onReplicaSuccess(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -157,6 +157,8 @@ public static State fromString(String state) { | |
|
|
||
| public static final String SETTING_NUMBER_OF_SHARDS = "index.number_of_shards"; | ||
| public static final String SETTING_NUMBER_OF_REPLICAS = "index.number_of_replicas"; | ||
| public static final String SETTING_SHADOW_REPLICAS = "index.shadow_replicas"; | ||
| public static final String SETTING_SHARED_FILESYSTEM = "index.shared_filesystem"; | ||
| public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas"; | ||
| public static final String SETTING_READ_ONLY = "index.blocks.read_only"; | ||
| public static final String SETTING_BLOCKS_READ = "index.blocks.read"; | ||
|
|
@@ -784,4 +786,23 @@ public static void writeTo(IndexMetaData indexMetaData, StreamOutput out) throws | |
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns <code>true</code> iff the given settings indicate that the index associated | ||
| * with these settings allocates it's shards on a shared filesystem. Otherwise <code>false</code>. The default | ||
| * setting for this is the returned value from {@link #isIndexUsingShadowReplicas(org.elasticsearch.common.settings.Settings)}. | ||
| */ | ||
| public static boolean usesSharedFilesystem(Settings settings) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we use the same method structure between this method and the following? I like
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, I'll rename. |
||
| return settings.getAsBoolean(SETTING_SHARED_FILESYSTEM, isIndexUsingShadowReplicas(settings)); | ||
| } | ||
|
|
||
| /** | ||
| * Returns <code>true</code> iff the given settings indicate that the index associated | ||
| * with these settings uses shadow replicas. Otherwise <code>false</code>. The default | ||
| * setting for this is <code>false</code>. | ||
| */ | ||
| public static boolean isIndexUsingShadowReplicas(Settings settings) { | ||
| return settings.getAsBoolean(SETTING_SHADOW_REPLICAS, false); | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -282,7 +282,7 @@ public String indexUUID() { | |
| return indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE); | ||
| } | ||
|
|
||
| public synchronized IndexShard createShard(int sShardId) throws ElasticsearchException { | ||
| public synchronized IndexShard createShard(int sShardId, boolean primary) throws ElasticsearchException { | ||
| /* | ||
| * TODO: we execute this in parallel but it's a synced method. Yet, we might | ||
| * be able to serialize the execution via the cluster state in the future. for now we just | ||
|
|
@@ -304,15 +304,16 @@ public synchronized IndexShard createShard(int sShardId) throws ElasticsearchExc | |
| indicesLifecycle.beforeIndexShardCreated(shardId, indexSettings); | ||
|
|
||
| logger.debug("creating shard_id {}", shardId); | ||
|
|
||
| // if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary. | ||
| final boolean ownsShard = IndexMetaData.usesSharedFilesystem(indexSettings) == false || (primary && IndexMetaData.usesSharedFilesystem(indexSettings)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we should name this after what we use it for ie.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I will rename this. |
||
| ModulesBuilder modules = new ModulesBuilder(); | ||
| modules.add(new ShardsPluginsModule(indexSettings, pluginsService)); | ||
| modules.add(new IndexShardModule(shardId, indexSettings)); | ||
| modules.add(new IndexShardModule(shardId, primary, indexSettings)); | ||
| modules.add(new ShardIndexingModule()); | ||
| modules.add(new ShardSearchModule()); | ||
| modules.add(new ShardGetModule()); | ||
| modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class), lock, | ||
| new StoreCloseListener(shardId))); | ||
| new StoreCloseListener(shardId, ownsShard))); | ||
| modules.add(new DeletionPolicyModule(indexSettings)); | ||
| modules.add(new MergePolicyModule(indexSettings)); | ||
| modules.add(new MergeSchedulerModule(indexSettings)); | ||
|
|
@@ -430,10 +431,12 @@ private void closeInjectorResource(ShardId shardId, Injector shardInjector, Clas | |
| } | ||
| } | ||
|
|
||
| private void onShardClose(ShardLock lock) { | ||
| private void onShardClose(ShardLock lock, boolean ownsShard) { | ||
| if (deleted.get()) { // we remove that shards content if this index has been deleted | ||
| try { | ||
| indicesServices.deleteShardStore("delete index", lock, indexSettings); | ||
| if (ownsShard) { | ||
| indicesServices.deleteShardStore("delete index", lock, indexSettings); | ||
| } | ||
| } catch (IOException e) { | ||
| logger.warn("{} failed to delete shard content", e, lock.getShardId()); | ||
| } | ||
|
|
@@ -442,15 +445,17 @@ private void onShardClose(ShardLock lock) { | |
|
|
||
| private class StoreCloseListener implements Store.OnClose { | ||
| private final ShardId shardId; | ||
| private final boolean ownsShard; | ||
|
|
||
| public StoreCloseListener(ShardId shardId) { | ||
| public StoreCloseListener(ShardId shardId, boolean ownsShard) { | ||
| this.shardId = shardId; | ||
| this.ownsShard = ownsShard; | ||
| } | ||
|
|
||
| @Override | ||
| public void handle(ShardLock lock) { | ||
| assert lock.getShardId().equals(shardId) : "shard Id mismatch, expected: " + shardId + " but got: " + lock.getShardId(); | ||
| onShardClose(lock); | ||
| onShardClose(lock, ownsShard); | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should leave a note about the limitation of the replicas being behind mapping wise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add a section to the documentation on this.