Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 157 additions & 136 deletions core/src/main/java/org/elasticsearch/ElasticsearchException.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ public void writeException(Throwable throwable) throws IOException {
writeVInt(17);
} else {
ElasticsearchException ex;
if (throwable instanceof ElasticsearchException && ElasticsearchException.isRegistered(throwable.getClass())) {
if (throwable instanceof ElasticsearchException && ElasticsearchException.isRegistered(throwable.getClass(), version)) {
ex = (ElasticsearchException) throwable;
} else {
ex = new NotSerializableExceptionWrapper(throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,36 @@

package org.elasticsearch.env;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;

/**
* Exception used when the in-memory lock for a shard cannot be obtained
*/
public class ShardLockObtainFailedException extends Exception {
private final ShardId shardId;
public class ShardLockObtainFailedException extends ElasticsearchException {

public ShardLockObtainFailedException(ShardId shardId, String message) {
super(message);
this.shardId = shardId;
super(buildMessage(shardId, message));
this.setShard(shardId);
}

public ShardLockObtainFailedException(ShardId shardId, String message, Throwable cause) {
super(message, cause);
this.shardId = shardId;
super(buildMessage(shardId, message), cause);
this.setShard(shardId);
}

public ShardLockObtainFailedException(StreamInput in) throws IOException {
super(in);
}

@Override
public String getMessage() {
private static String buildMessage(ShardId shardId, String message) {
StringBuilder sb = new StringBuilder();
sb.append(shardId.toString());
sb.append(": ");
sb.append(super.getMessage());
sb.append(message);
return sb.toString();
}
}
2 changes: 0 additions & 2 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,6 @@ public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId
// that's fine - happens all the time no need to log
} catch (FileNotFoundException | NoSuchFileException ex) {
logger.info("Failed to open / find files while reading metadata snapshot");
} catch (ShardLockObtainFailedException ex) {
logger.info((Supplier<?>) () -> new ParameterizedMessage("{}: failed to obtain shard lock", shardId), ex);
}
return MetadataSnapshot.EMPTY;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tricky change to make and I want to understand the full consequences first. If I follow the flow of the code correctly, this code is called when a master wants to allocate a replica and checks if there are nodes that have data for the replica. Making this change here means that the fetching for this node will be considered as failed and will trigger a reroute in AsyncShardFetch. The replica might however still be allocated in ReplicaShardAllocator to another node or later allocated on any node in BalancedShardsAllocator. I'm not sure if that's the change we want. Can you leave this code as is for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok sure - I copied this from your commit 👯‍♂️

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentLocation;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.index.AlreadyExpiredException;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.RecoveryEngineException;
Expand Down Expand Up @@ -107,6 +108,7 @@
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;

public class ExceptionSerializationTests extends ESTestCase {

Expand Down Expand Up @@ -160,10 +162,10 @@ private void checkClass(Class<?> clazz) {
if (isEsException(clazz) == false) {
return;
}
if (ElasticsearchException.isRegistered(clazz.asSubclass(Throwable.class)) == false
if (ElasticsearchException.isRegistered(clazz.asSubclass(Throwable.class), Version.CURRENT) == false
&& ElasticsearchException.class.equals(clazz.getEnclosingClass()) == false) {
notRegistered.add(clazz);
} else if (ElasticsearchException.isRegistered(clazz.asSubclass(Throwable.class))) {
} else if (ElasticsearchException.isRegistered(clazz.asSubclass(Throwable.class), Version.CURRENT)) {
registered.add(clazz);
try {
if (clazz.getMethod("writeTo", StreamOutput.class) != null) {
Expand Down Expand Up @@ -218,10 +220,17 @@ public TestException(StreamInput in) throws IOException {
}

private <T extends Exception> T serialize(T exception) throws IOException {
ElasticsearchAssertions.assertVersionSerializable(VersionUtils.randomVersion(random()), exception);
return serialize(exception, VersionUtils.randomVersion(random()));
}

private <T extends Exception> T serialize(T exception, Version version) throws IOException {
ElasticsearchAssertions.assertVersionSerializable(version, exception);
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(version);
out.writeException(exception);

StreamInput in = out.bytes().streamInput();
in.setVersion(version);
return in.readException();
}

Expand Down Expand Up @@ -769,6 +778,7 @@ public void testIds() {
ids.put(144, org.elasticsearch.cluster.NotMasterException.class);
ids.put(145, org.elasticsearch.ElasticsearchStatusException.class);
ids.put(146, org.elasticsearch.tasks.TaskCancelledException.class);
ids.put(147, org.elasticsearch.env.ShardLockObtainFailedException.class);

Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
Expand Down Expand Up @@ -826,4 +836,28 @@ public void testElasticsearchRemoteException() throws IOException {
assertEquals(ex.status(), e.status());
assertEquals(RestStatus.TOO_MANY_REQUESTS, e.status());
}

public void testShardLockObtainFailedException() throws IOException {
ShardId shardId = new ShardId("foo", "_na_", 1);
ShardLockObtainFailedException orig = new ShardLockObtainFailedException(shardId, "boom");
Version version = VersionUtils.randomVersionBetween(random(),
Version.V_5_0_0, Version.CURRENT);
if (version.before(ElasticsearchException.V_5_1_0_UNRELEASED)) {
// remove this once 5_1_0 is released randomVersionBetween asserts that this version is in the constant table..
version = ElasticsearchException.V_5_1_0_UNRELEASED;
}
ShardLockObtainFailedException ex = serialize(orig, version);
assertEquals(orig.getMessage(), ex.getMessage());
assertEquals(orig.getShardId(), ex.getShardId());
}

public void testBWCShardLockObtainFailedException() throws IOException {
ShardId shardId = new ShardId("foo", "_na_", 1);
ShardLockObtainFailedException orig = new ShardLockObtainFailedException(shardId, "boom");
Exception ex = serialize((Exception)orig, Version.V_5_0_0);
assertThat(ex, instanceOf(NotSerializableExceptionWrapper.class));
assertEquals("shard_lock_obtain_failed_exception: [foo][1]: boom", ex.getMessage());
}


}
1 change: 1 addition & 0 deletions core/src/test/java/org/elasticsearch/VersionTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public void testUnknownVersions() {
assertUnknownVersion(OsStats.V_5_1_0); // once we released 5.1.0 and it's added to Version.java we need to remove this constant
assertUnknownVersion(QueryStringQueryBuilder.V_5_1_0_UNRELEASED);
assertUnknownVersion(SimpleQueryStringBuilder.V_5_1_0_UNRELEASED);
assertUnknownVersion(ElasticsearchException.V_5_1_0_UNRELEASED);
// once we released 5.0.0 and it's added to Version.java we need to remove this constant
assertUnknownVersion(Script.V_5_1_0_UNRELEASED);
// once we released 5.0.0 and it's added to Version.java we need to remove this constant
Expand Down