Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardReq

public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
final long primaryTerm = request.primaryTerm();
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
final Engine.Result operationResult;
Expand All @@ -457,10 +458,12 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
operationResult = executeIndexRequestOnReplica(primaryResponse, (IndexRequest) docWriteRequest, replica);
operationResult =
executeIndexRequestOnReplica(primaryResponse, (IndexRequest) docWriteRequest, primaryTerm, replica);
break;
case DELETE:
operationResult = executeDeleteRequestOnReplica(primaryResponse, (DeleteRequest) docWriteRequest, replica);
operationResult =
executeDeleteRequestOnReplica(primaryResponse, (DeleteRequest) docWriteRequest, primaryTerm, replica);
break;
default:
throw new IllegalStateException("Unexpected request operation type on replica: "
Expand Down Expand Up @@ -529,13 +532,13 @@ private static Translog.Location locationToSync(Translog.Location current,
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
*/
private static Engine.IndexResult executeIndexRequestOnReplica(
DocWriteResponse primaryResponse,
IndexRequest request,
IndexShard replica) throws IOException {
DocWriteResponse primaryResponse,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you maybe un-indent this into a single line?

IndexRequest request,
long primaryTerm, IndexShard replica) throws IOException {

final Engine.Index operation;
try {
operation = prepareIndexOperationOnReplica(primaryResponse, request, replica);
operation = prepareIndexOperationOnReplica(primaryResponse, request, primaryTerm, replica);
} catch (MapperParsingException e) {
return new Engine.IndexResult(e, primaryResponse.getVersion(), primaryResponse.getSeqNo());
}
Expand All @@ -553,6 +556,7 @@ private static Engine.IndexResult executeIndexRequestOnReplica(
static Engine.Index prepareIndexOperationOnReplica(
DocWriteResponse primaryResponse,
IndexRequest request,
long primaryTerm,
IndexShard replica) {

final ShardId shardId = replica.shardId();
Expand All @@ -565,7 +569,7 @@ static Engine.Index prepareIndexOperationOnReplica(
final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery();
assert versionType.validateVersionForWrites(version);

return replica.prepareIndexOnReplica(sourceToParse, seqNo, version, versionType,
return replica.prepareIndexOnReplica(sourceToParse, seqNo, primaryTerm, version, versionType,
request.getAutoGeneratedTimestamp(), request.isRetry());
}

Expand Down Expand Up @@ -647,7 +651,7 @@ private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest r
}

private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteResponse primaryResponse, DeleteRequest request,
IndexShard replica) throws Exception {
final long primaryTerm, IndexShard replica) throws Exception {
if (replica.indexSettings().isSingleType()) {
// We need to wait for the replica to have the mappings
Mapping update;
Expand All @@ -667,7 +671,7 @@ private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteRespons
final long version = primaryResponse.getVersion();
assert versionType.validateVersionForWrites(version);
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
primaryResponse.getSeqNo(), request.primaryTerm(), version, versionType);
primaryResponse.getSeqNo(), primaryTerm, version, versionType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's trappy that the IndexRequest/DeleteRequest object might not have a primaryTerm properly set if it's wrapped in a BulkShardRequest. Maybe we could override BulkShardRequest.primaryTerm(long) to set the primary term of the inner objects until we have fixed this discrepancy?

return replica.delete(delete);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,10 +522,11 @@ public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, Ve
}
}

public Engine.Index prepareIndexOnReplica(SourceToParse source, long seqNo, long version, VersionType versionType, long autoGeneratedIdTimestamp,
boolean isRetry) {
public Engine.Index prepareIndexOnReplica(SourceToParse source, long seqNo, long primaryTerm, long version, VersionType versionType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling this just primaryTerm is confusing (in light of a future PR that uses this code during resync). Here it means the term of the primary that was sending this operation, not necessarily the primary term of the log entry that is being replicated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it means the term of the primary that was sending this operation, not necessarily the primary term of the log entry that is being replicated

Actually it's the term of the op in the log and not the authority of the primary that sends this op (that one gause into the permit method). I guess this proves your point about being confusing. Any suggestion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spoke to Yannick on a different channel. We decided to rename the parameter name opPrimaryTerm and relax the assertion to reflect semantics (as opposed to current usage).

long autoGeneratedIdTimestamp, boolean isRetry) {
try {
verifyReplicationTarget();
assert primaryTerm == this.primaryTerm : "op term [ " + primaryTerm + " ] != shard term [" + this.primaryTerm + "]";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a hard exception? something is bloody wrong here if this happens? maybe IllegalStateException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is an interesting one - yes things are totally fucked up. I'm torn as to whether to change it - with assertions nodes die / test fail with the right exception as we never catch them. An IlegalStateException will cause the shard to be failed and then the failure may cascade further. I'm tempted to keep as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be kept as is exactly for the reason that @bleskes mentions regarding the consequences in production code of changing this to a hard failure and the possible loss of visibility when tests are running. This really should never happen.

return prepareIndex(docMapper(source.type()), source, seqNo, primaryTerm, version, versionType,
Engine.Operation.Origin.REPLICA, autoGeneratedIdTimestamp, isRetry);
} catch (Exception e) {
Expand Down Expand Up @@ -597,6 +598,7 @@ public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version
public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long primaryTerm,
long version, VersionType versionType) {
verifyReplicationTarget();
assert primaryTerm == this.primaryTerm : "op term [ " + primaryTerm + " ] != shard term [" + this.primaryTerm + "]";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here?

final Term uid = extractUidForDelete(type, id);
return prepareDelete(type, id, uid, seqNo, primaryTerm, version, versionType, Engine.Operation.Origin.REPLICA);
}
Expand Down Expand Up @@ -1879,8 +1881,9 @@ public void acquireReplicaOperationPermit(
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
assert operationPrimaryTerm > primaryTerm;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a message to this assertion while you are at it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

primaryTerm = operationPrimaryTerm;
getEngine().getTranslog().rollGeneration();
});
} catch (final InterruptedException | TimeoutException e) {
} catch (final InterruptedException | TimeoutException | IOException | AlreadyClosedException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just catch (Exception)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this just got out of hand.

onPermitAcquired.onFailure(e);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
Expand Down Expand Up @@ -70,7 +71,8 @@ public void close() {
* @throws TimeoutException if timed out waiting for in-flight operations to finish
* @throws IndexShardClosedException if operation permit has been closed
*/
public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) throws InterruptedException, TimeoutException {
public <E extends Exception> void blockOperations(long timeout, TimeUnit timeUnit, CheckedRunnable<E> onBlocked) throws
InterruptedException, TimeoutException, E {
if (closed) {
throw new IndexShardClosedException(shardId);
}
Expand Down Expand Up @@ -109,9 +111,9 @@ public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked)

/**
* Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
* {@link ActionListener} will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)},
* permit acquisition can be delayed. The provided ActionListener will then be called using the provided executor once operations are no
* longer blocked.
* {@link ActionListener} will be called on the calling thread. During calls of
* {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided ActionListener will
* then be called using the provided executor once operations are no longer blocked.
*
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
* @param executorOnDelay executor to use for delayed call
Expand Down
32 changes: 21 additions & 11 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -910,11 +910,11 @@ public Index(Engine.Index index, Engine.IndexResult indexResult) {
this.autoGeneratedIdTimestamp = index.getAutoGeneratedIdTimestamp();
}

public Index(String type, String id, byte[] source) {
public Index(String type, String id, long seqNo, byte[] source) {
this.type = type;
this.id = id;
this.source = new BytesArray(source);
this.seqNo = 0;
this.seqNo = seqNo;
version = Versions.MATCH_ANY;
versionType = VersionType.INTERNAL;
routing = null;
Expand Down Expand Up @@ -1037,9 +1037,11 @@ public int hashCode() {
@Override
public String toString() {
return "Index{" +
"id='" + id + '\'' +
", type='" + type + '\'' +
'}';
"id='" + id + '\'' +
", type='" + type + '\'' +
", seqNo=" + seqNo +
", primaryTerm=" + primaryTerm +
'}';
}

public long getAutoGeneratedIdTimestamp() {
Expand Down Expand Up @@ -1079,8 +1081,8 @@ public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) {
}

/** utility for testing */
public Delete(String type, String id, Term uid) {
this(type, id, uid, 0, 0, Versions.MATCH_ANY, VersionType.INTERNAL);
public Delete(String type, String id, long seqNo, Term uid) {
this(type, id, uid, seqNo, 0, Versions.MATCH_ANY, VersionType.INTERNAL);
}

public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) {
Expand Down Expand Up @@ -1180,10 +1182,11 @@ public int hashCode() {
@Override
public String toString() {
return "Delete{" +
"uid=" + uid +
'}';
"uid=" + uid +
", seqNo=" + seqNo +
", primaryTerm=" + primaryTerm +
'}';
}

}

public static class NoOp implements Operation {
Expand Down Expand Up @@ -1260,9 +1263,16 @@ public int hashCode() {
return 31 * 31 * 31 + 31 * 31 * Long.hashCode(seqNo) + 31 * Long.hashCode(primaryTerm) + reason().hashCode();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this hashcode looks odd....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed. I generated a new one.

}

@Override
public String toString() {
return "NoOp{" +
"seqNo=" + seqNo +
", primaryTerm=" + primaryTerm +
", reason='" + reason + '\'' +
'}';
}
}


public enum Durability {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.lucene.store.OutputStreamDataOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand All @@ -39,6 +41,8 @@
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -71,6 +75,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
// lock order synchronized(syncLock) -> synchronized(this)
private final Object syncLock = new Object();

private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;

private TranslogWriter(
final ChannelFactory channelFactory,
final ShardId shardId,
Expand All @@ -90,6 +96,13 @@ private TranslogWriter(
assert initialCheckpoint.maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : initialCheckpoint.maxSeqNo;
this.maxSeqNo = initialCheckpoint.maxSeqNo;
this.globalCheckpointSupplier = globalCheckpointSupplier;
boolean assertionsEnabled = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert (seenSequenceNumbers = new HashMap<>()) != null; if you wanna have a one-liner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then I think I can't make seenSequenceNumbers final which is a shame...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please integrate #24834 and use it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

integrated. Nice work.

assert assertionsEnabled = true;
if (assertionsEnabled) {
seenSequenceNumbers = new HashMap<>();
} else {
seenSequenceNumbers = null;
}
}

static int getHeaderLength(String translogUUID) {
Expand Down Expand Up @@ -195,9 +208,30 @@ public synchronized Translog.Location add(final BytesReference data, final long

operationCounter++;

assert assertSeqNoNotSeen(seqNo, data);

return new Translog.Location(generation, offset, data.length());
}

private boolean assertSeqNoNotSeen(long seqNo, BytesReference data) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's only called in one place but can you make it synchronized just for readability?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

if (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
// nothing to do
} else if (seenSequenceNumbers.containsKey(seqNo)) {
final Tuple<BytesReference, Exception> previous = seenSequenceNumbers.get(seqNo);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not StackTraceElement[]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way so it can easily integrate into a caused by clause and have a proper message. Do you have a better suggestion there?

if (previous.v1().equals(data) == false) {
Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput()));
Translog.Operation prvOp = Translog.readOperation(new BufferedChecksumStreamInput(previous.v1().streamInput()));
throw new AssertionError(
"seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " +
"prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2());
}
} else {
seenSequenceNumbers.put(seqNo,
new Tuple<>(new BytesArray(data.toBytesRef(), true), new RuntimeException("stack capture previous op")));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not Thread.currentThread().getStackTrace()?

}
return true;
}

/**
* write all buffered ops to disk and fsync file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,16 +631,17 @@ public void testPrepareIndexOpOnReplica() throws Exception {
IndexMetaData metaData = indexMetaData();
IndexShard shard = newStartedShard(false);

DocWriteResponse primaryResponse = new IndexResponse(shardId, "index", "id", 1, 17, 1, randomBoolean());
DocWriteResponse primaryResponse = new IndexResponse(shardId, "index", "id", 17, 0, 1, randomBoolean());
IndexRequest request = new IndexRequest("index", "type", "id")
.source(Requests.INDEX_CONTENT_TYPE, "field", "value");

Engine.Index op = TransportShardBulkAction.prepareIndexOperationOnReplica(
primaryResponse, request, shard);
primaryResponse, request, shard.getPrimaryTerm(), shard);

assertThat(op.version(), equalTo(primaryResponse.getVersion()));
assertThat(op.seqNo(), equalTo(primaryResponse.getSeqNo()));
assertThat(op.versionType(), equalTo(VersionType.EXTERNAL));
assertThat(op.primaryTerm(), equalTo(shard.getPrimaryTerm()));

closeShards(shard);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2730,7 +2730,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE),
null,
() -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
translog.add(new Translog.Index("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8"))));
translog.add(new Translog.Index("test", "SomeBogusId", 0, "{}".getBytes(Charset.forName("UTF-8"))));
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
translog.close();

Expand Down Expand Up @@ -3015,8 +3015,8 @@ public void testDoubleDeliveryPrimary() throws IOException {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
assertEquals(1, topDocs.totalHits);
}
operation = randomAppendOnly(doc, false, 1);
retry = randomAppendOnly(doc, true, 1);
operation = appendOnlyPrimary(doc, false, 1);
retry = appendOnlyPrimary(doc, true, 1);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(indexResult.getTranslogLocation());
Expand Down Expand Up @@ -3328,10 +3328,11 @@ public void testAppendConcurrently() throws InterruptedException, IOException {
int numDocs = randomIntBetween(1000, 10000);
assertEquals(0, engine.getNumVersionLookups());
assertEquals(0, engine.getNumIndexVersionsLookups());
boolean primary = randomBoolean();
List<Engine.Index> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
final ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Engine.Index index = randomAppendOnly(doc, false, i);
Engine.Index index = primary ? appendOnlyPrimary(doc, false, i) : appendOnlyReplica(doc, false, i, i);
docs.add(index);
}
Collections.shuffle(docs, random());
Expand Down
Loading