Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
05975af
Add ShadowEngine
dakrone Jan 12, 2015
38135af
make tests pass
s1monw Feb 10, 2015
7fcb373
make test more evil
s1monw Feb 10, 2015
be02cab
Add test that restarts nodes to ensure shadow replicas recover
dakrone Feb 11, 2015
343dc0b
long adder is not available in java7
s1monw Feb 11, 2015
2a2eed1
Merge branch 'master' into shadow-replicas
s1monw Feb 11, 2015
24d36c9
utilize the new delete code
s1monw Feb 11, 2015
2d42736
shortcut recovery if we are on a shared FS - no need to compare files…
s1monw Feb 11, 2015
ca9beb2
Merge branch 'master' into shadow-replicas
s1monw Feb 11, 2015
67d7df4
Add start of ShadowEngine unit tests
dakrone Feb 11, 2015
1896fed
Add testShadowEngineIgnoresWriteOperations and testSearchResultRelease
dakrone Feb 11, 2015
a95adbe
Remove tests that don't apply to ShadowEngine
dakrone Feb 11, 2015
52e9cd1
Add a test for replica -> primary promotion
dakrone Feb 11, 2015
2378fbb
Fix missing import
dakrone Feb 11, 2015
5e33eea
Remove overly-complex test
dakrone Feb 11, 2015
80cf0e8
Remove nocommit in ShadowEngineTests#testFailStart()
dakrone Feb 11, 2015
e4dbfb0
Fix segment info for ShadowEngine, remove test nocommit
dakrone Feb 11, 2015
06e2eb4
Add a test checking that indices with shadow replicas clean up after …
dakrone Feb 11, 2015
fdbe413
Use check for shared filesystem in primary -> primary relocation
dakrone Feb 11, 2015
5689b7d
Add testShadowReplicaNaturalRelocation
dakrone Feb 11, 2015
cf2fb80
Make assertPathHasBeenCleared recursive
dakrone Feb 11, 2015
4a367c0
fix primary promotion
s1monw Feb 12, 2015
f229719
first cut at catchup from primary
s1monw Feb 12, 2015
abda780
Merge branch 'master' into shadow-replicas
s1monw Feb 12, 2015
a62b9a7
fix compile error after upstream changes
s1monw Feb 12, 2015
a7eb53c
Simplify shared filesystem recovery by using a dedicated recovery han…
s1monw Feb 12, 2015
d8d59db
Refactor more shared methods into the abstract Engine
dakrone Feb 12, 2015
28a9d18
Remove nocommit, document canDeleteIndexContents
dakrone Feb 12, 2015
4f71c8d
Add documentation to ShadowEngine
dakrone Feb 12, 2015
ea4e3e5
Add documentation to ShadowIndexShard, remove nocommit
dakrone Feb 12, 2015
aea9692
revert unneeded changes on Store
s1monw Feb 13, 2015
83d171c
Merge branch 'master' into shadow-replicas
s1monw Feb 13, 2015
213292e
add one more nocommit
s1monw Feb 13, 2015
e5bc047
Don't replicate document request when using shadow replicas
dakrone Feb 13, 2015
740c28d
Merge branch 'master' into shadow-replicas
dakrone Feb 13, 2015
2ae80f9
throw UnsupportedOperationException on write operations in ShadowEngine
dakrone Feb 13, 2015
c4c999d
Merge branch 'master' into shadow-replicas
s1monw Feb 16, 2015
28f069b
fix primary relocation
s1monw Feb 16, 2015
82b9f04
reduce the changes compared to master
s1monw Feb 16, 2015
abb6965
remove nocommit and simplify delete logic
s1monw Feb 16, 2015
d77414c
remove nocommits in IndexMetaData
s1monw Feb 17, 2015
48a700d
add test for failing shadow engine / remove nocommit
s1monw Feb 17, 2015
e8ad614
Merge branch 'master' into shadow-replicas
s1monw Feb 17, 2015
77fba57
Merge branch 'master' into shadow-replicas
s1monw Feb 17, 2015
11886b7
Back out non-shared FS code. this will go in in a second iteration
s1monw Feb 17, 2015
744f228
revert changes to IndexShardGateway - these are leftovers from previo…
s1monw Feb 17, 2015
45cd34a
fix tests
s1monw Feb 17, 2015
c5ece6d
simplify shadow engine
s1monw Feb 17, 2015
eb699c1
remove last nocommit
s1monw Feb 17, 2015
4473e63
Add asciidoc documentation for shadow replicas
dakrone Feb 17, 2015
b9d1fed
Merge remote-tracking branch 'refs/remotes/origin/master' into shadow…
dakrone Feb 17, 2015
0b1b852
Merge remote-tracking branch 'refs/remotes/origin/master' into shadow…
dakrone Feb 17, 2015
a1b8b8c
Remove unused import and fix index creation example in docs
dakrone Feb 17, 2015
b64fef1
Add warning that predefined mappings should be used
dakrone Feb 18, 2015
23001af
Remove ShadowEngineFactory, add .newReadOnlyEngine method in EngineFa…
dakrone Feb 18, 2015
d90d698
Rename `ownsShard` to `canDeleteShardContent`
dakrone Feb 18, 2015
7346f9f
Revert changes to RecoveryTarget.java
dakrone Feb 18, 2015
60a4d53
Add a test for shadow replicas that uses field data
dakrone Feb 18, 2015
c8e8db4
Clarify comment about pre-defined mappings
dakrone Feb 18, 2015
73c62df
Add MockShadowEngine and hook it up to be used
dakrone Feb 18, 2015
1a0d456
Rename usesSharedFilesystem -> isOnSharedFilesystem
dakrone Feb 18, 2015
62b0c28
Use IndexMetaData.isIndexUsingShadowReplicas helper
dakrone Feb 18, 2015
67a797a
Factor out AssertingSearcher so it can be used by mock Engines
dakrone Feb 18, 2015
edd4943
Move engine creation into protected createNewEngine method
dakrone Feb 18, 2015
325acbe
Use ?preference=_primary automatically for realtime GET operations
dakrone Feb 18, 2015
2083503
Use Enum for "_primary" preference
dakrone Feb 18, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/reference/indices.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ and warmers.
* <<indices-templates>>
* <<indices-warmers>>

[float]
[[shadow-replicas]]
== Replica configurations
* <<indices-shadow-replicas>>

[float]
[[monitoring]]
== Monitoring:
Expand Down Expand Up @@ -107,3 +112,4 @@ include::indices/optimize.asciidoc[]

include::indices/upgrade.asciidoc[]

include::indices/shadow-replicas.asciidoc[]
105 changes: 105 additions & 0 deletions docs/reference/indices/shadow-replicas.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
[[indices-shadow-replicas]]
== Shadow replica indices

experimental[]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this!


If you would like to use a shared filesystem, you can use the shadow replicas
settings to choose where on disk the data for an index should be kept, as well
as how Elasticsearch should replay operations on all the replica shards of an
index.

In order to fully utilize the `index.data_path` and `index.shadow_replicas`
settings, you need to enable using it in elasticsearch.yml:

[source,yaml]
--------------------------------------------------
node.enable_custom_paths: true
--------------------------------------------------

You can then create an index with a custom data path, where each node will use
this path for the data:

[WARNING]
========================
Because shadow replicas do not index the document on replica shards, it's
possible for the replica's known mapping to be behind the index's known mapping
if the latest cluster state has not yet been processed on the node containing
the replica. Because of this, it is highly recommended to use pre-defined
mappings when using shadow replicas.
========================

[source,js]
--------------------------------------------------
curl -XPUT 'localhost:9200/my_index' -d '
{
"index" : {
"number_of_shards" : 1,
"number_of_replicas" : 4,
"data_path": "/var/data/my_index",
"shadow_replicas": true
}
}'
--------------------------------------------------

[WARNING]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should leave a note about the limitation of the replicas being behind mapping wise?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a section to the documentation on this.

========================
In the above example, the "/var/data/my_index" path is a shared filesystem that
must be available on every node in the Elasticsearch cluster. You must also
ensure that the Elasticsearch process has the correct permissions to read from
and write to the directory used in the `index.data_path` setting.
========================

An index that has been created with the `index.shadow_replicas` setting set to
"true" will not replicate document operations to any of the replica shards,
instead, it will only continually refresh. Once segments are available on the
filesystem where the shadow replica resides (after an Elasticsearch "flush"), a
regular refresh (governed by the `index.refresh_interval`) can be used to make
the new data searchable.

NOTE: Since documents are only indexed on the primary shard, realtime GET
requests could fail to return a document if executed on the replica shard,
therefore, GET API requests automatically have the `?preference=_primary` flag
set if there is no preference flag already set.

In order to ensure the data is being synchronized in a fast enough manner, you
may need to tune the flush threshold for the index to a desired number. A flush
is needed to fsync segment files to disk, so they will be visible to all other
replica nodes. Users should test what flush threshold levels they are
comfortable with, as increased flushing can impact indexing performance.

The Elasticsearch cluster will still detect the loss of a primary shard, and
transform the replica into a primary in this situation. This transformation will
take slightly longer, since no `IndexWriter` is maintained for each shadow
replica.

Below is the list of settings that can be changed using the update
settings API:

`index.data_path` (string)::
Path to use for the index's data. Note that by default Elasticsearch will
append the node ordinal by default to the path to ensure multiple instances
of Elasticsearch on the same machine do not share a data directory.

`index.shadow_replicas`::
Boolean value indicating this index should use shadow replicas. Defaults to
`false`.

`index.shared_filesystem`::
Boolean value indicating this index uses a shared filesystem. Defaults to
the `true` if `index.shadow_replicas` is set to true, `false` otherwise.

=== Node level settings related to shadow replicas

These are non-dynamic settings that need to be configured in `elasticsearch.yml`

`node.add_id_to_custom_path`::
Boolean setting indicating whether Elasticsearch should append the node's
ordinal to the custom data path. For example, if this is enabled and a path
of "/tmp/foo" is used, the first locally-running node will use "/tmp/foo/0",
the second will use "/tmp/foo/1", the third "/tmp/foo/2", etc. Defaults to
`true`.

`node.enable_custom_paths`::
Boolean value that must be set to `true` in order to use the
`index.data_path` setting. Defaults to `false`.

14 changes: 12 additions & 2 deletions src/main/java/org/elasticsearch/action/get/TransportGetAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -74,6 +76,14 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
if (request.request().realtime == null) {
request.request().realtime = this.realtime;
}
IndexMetaData indexMeta = state.getMetaData().index(request.concreteIndex());
if (request.request().realtime && // if the realtime flag is set
request.request().preference() == null && // the preference flag is not already set
indexMeta != null && // and we have the index
IndexMetaData.isIndexUsingShadowReplicas(indexMeta.settings())) { // and the index uses shadow replicas
// set the preference for the request to use "_primary" automatically
request.request().preference(Preference.PRIMARY.type());
}
// update the routing (request#index here is possibly an alias)
request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index()));
// Fail fast on the node that received the request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -631,9 +631,23 @@ void performOnReplica(final ReplicationState state, final ShardRouting shard, fi
}

final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), state.replicaRequest());

// If the replicas use shadow replicas, there is no reason to
// perform the action on the replica, so skip it and
// immediately return
if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings())) {
// this delays mapping updates on replicas because they have
// to wait until they get the new mapping through the cluster
// state, which is why we recommend pre-defined mappings for
// indices using shadow replicas
state.onReplicaSuccess();
return;
}

if (!nodeId.equals(observer.observedState().nodes().localNodeId())) {
final DiscoveryNode node = observer.observedState().nodes().get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(node, transportReplicaAction, shardRequest,
transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty vResponse) {
state.onReplicaSuccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ public static State fromString(String state) {

public static final String SETTING_NUMBER_OF_SHARDS = "index.number_of_shards";
public static final String SETTING_NUMBER_OF_REPLICAS = "index.number_of_replicas";
public static final String SETTING_SHADOW_REPLICAS = "index.shadow_replicas";
public static final String SETTING_SHARED_FILESYSTEM = "index.shared_filesystem";
public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final String SETTING_READ_ONLY = "index.blocks.read_only";
public static final String SETTING_BLOCKS_READ = "index.blocks.read";
Expand Down Expand Up @@ -784,4 +786,25 @@ public static void writeTo(IndexMetaData indexMetaData, StreamOutput out) throws
}
}
}

/**
* Returns <code>true</code> iff the given settings indicate that the index
* associated with these settings allocates it's shards on a shared
* filesystem. Otherwise <code>false</code>. The default setting for this
* is the returned value from
* {@link #isIndexUsingShadowReplicas(org.elasticsearch.common.settings.Settings)}.
*/
public static boolean isOnSharedFilesystem(Settings settings) {
return settings.getAsBoolean(SETTING_SHARED_FILESYSTEM, isIndexUsingShadowReplicas(settings));
}

/**
* Returns <code>true</code> iff the given settings indicate that the index associated
* with these settings uses shadow replicas. Otherwise <code>false</code>. The default
* setting for this is <code>false</code>.
*/
public static boolean isIndexUsingShadowReplicas(Settings settings) {
return settings.getAsBoolean(SETTING_SHADOW_REPLICAS, false);
}

}
1 change: 1 addition & 0 deletions src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ private boolean isShardLocked(ShardId id) {
*
* @param index the index to delete
* @param lockTimeoutMS how long to wait for acquiring the indices shard locks
* @param indexSettings settings for the index being deleted
* @throws Exception if any of the shards data directories can't be locked or deleted
*/
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, @IndexSettings Settings indexSettings) throws IOException {
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/org/elasticsearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.HashFunction;
import org.elasticsearch.cluster.routing.DjbHashFunction;
import org.elasticsearch.cluster.routing.HashFunction;
import org.elasticsearch.cluster.routing.SimpleHashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
Expand All @@ -54,7 +54,9 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.*;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down
22 changes: 14 additions & 8 deletions src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public String indexUUID() {
return indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
}

public synchronized IndexShard createShard(int sShardId) throws ElasticsearchException {
public synchronized IndexShard createShard(int sShardId, boolean primary) throws ElasticsearchException {
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
Expand All @@ -304,15 +304,17 @@ public synchronized IndexShard createShard(int sShardId) throws ElasticsearchExc
indicesLifecycle.beforeIndexShardCreated(shardId, indexSettings);

logger.debug("creating shard_id {}", shardId);

// if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary.
final boolean canDeleteShardContent = IndexMetaData.isOnSharedFilesystem(indexSettings) == false ||
(primary && IndexMetaData.isOnSharedFilesystem(indexSettings));
ModulesBuilder modules = new ModulesBuilder();
modules.add(new ShardsPluginsModule(indexSettings, pluginsService));
modules.add(new IndexShardModule(shardId, indexSettings));
modules.add(new IndexShardModule(shardId, primary, indexSettings));
modules.add(new ShardIndexingModule());
modules.add(new ShardSearchModule());
modules.add(new ShardGetModule());
modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class), lock,
new StoreCloseListener(shardId)));
new StoreCloseListener(shardId, canDeleteShardContent)));
modules.add(new DeletionPolicyModule(indexSettings));
modules.add(new MergePolicyModule(indexSettings));
modules.add(new MergeSchedulerModule(indexSettings));
Expand Down Expand Up @@ -430,10 +432,12 @@ private void closeInjectorResource(ShardId shardId, Injector shardInjector, Clas
}
}

private void onShardClose(ShardLock lock) {
private void onShardClose(ShardLock lock, boolean ownsShard) {
if (deleted.get()) { // we remove that shards content if this index has been deleted
try {
indicesServices.deleteShardStore("delete index", lock, indexSettings);
if (ownsShard) {
indicesServices.deleteShardStore("delete index", lock, indexSettings);
}
} catch (IOException e) {
logger.warn("{} failed to delete shard content", e, lock.getShardId());
}
Expand All @@ -442,15 +446,17 @@ private void onShardClose(ShardLock lock) {

private class StoreCloseListener implements Store.OnClose {
private final ShardId shardId;
private final boolean ownsShard;

public StoreCloseListener(ShardId shardId) {
public StoreCloseListener(ShardId shardId, boolean ownsShard) {
this.shardId = shardId;
this.ownsShard = ownsShard;
}

@Override
public void handle(ShardLock lock) {
assert lock.getShardId().equals(shardId) : "shard Id mismatch, expected: " + shardId + " but got: " + lock.getShardId();
onShardClose(lock);
onShardClose(lock, ownsShard);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public String[] getFiles() {
}

/**
* Releases the current snapshot, returning <code>true</code> if it was
* actually released.
* Releases the current snapshot.
*/
public void close() {
deletionPolicy.close(getGeneration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@
*/
public interface EngineFactory {

public Engine newEngine(EngineConfig config);
public Engine newReadWriteEngine(EngineConfig config);

public Engine newReadOnlyEngine(EngineConfig config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@

public class InternalEngineFactory implements EngineFactory {
@Override
public Engine newEngine(EngineConfig config) {
public Engine newReadWriteEngine(EngineConfig config) {
return new InternalEngine(config);
}

@Override
public Engine newReadOnlyEngine(EngineConfig config) {
return new ShadowEngine(config);
}
}
Loading