Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
Expand Down Expand Up @@ -163,12 +162,6 @@ public InternalEngine(EngineConfig engineConfig) {
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
}
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
);
this.deletionPolicy = new CombinedDeletionPolicy(
new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
store.incRef();
IndexWriter writer = null;
Translog translog = null;
Expand All @@ -183,28 +176,37 @@ public InternalEngine(EngineConfig engineConfig) {
throttle = new IndexThrottle();
try {
final SeqNoStats seqNoStats;
final boolean shouldCreateIndex;
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
writer = createWriter(false);
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
shouldCreateIndex = false;
break;
case OPEN_INDEX_CREATE_TRANSLOG:
writer = createWriter(false);
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
shouldCreateIndex = false;
break;
case CREATE_INDEX_AND_TRANSLOG:
writer = createWriter(true);
seqNoStats = new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO);
shouldCreateIndex = true;
break;
default:
throw new IllegalArgumentException(openMode.toString());
}
logger.trace("recovered [{}]", seqNoStats);
seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
);
this.deletionPolicy = new CombinedDeletionPolicy(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that you had to do this sucks. I think we should make createWriter static and pass all the things to it that it needs rather than expecting members to be initialized. this should be done in a followup

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I will address this in a followup.

new SnapshotDeletionPolicy(new KeepUntilGlobalCheckpointDeletionPolicy(seqNoService::getGlobalCheckpoint)),
translogDeletionPolicy, openMode);
writer = createWriter(shouldCreateIndex);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
Objects.requireNonNull(historyUUID, "history uuid should not be null");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.elasticsearch.index.seqno.SequenceNumbers;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;

/**
* An {@link IndexDeletionPolicy} keeps the oldest commit whose max sequence number is not
* greater than the current global checkpoint, and also keeps all subsequent commits. Once those
* commits are kept, a {@link CombinedDeletionPolicy} will retain translog operations at least up to
* the current global checkpoint.
*/
public final class KeepUntilGlobalCheckpointDeletionPolicy extends IndexDeletionPolicy {
private final LongSupplier globalCheckpointSupplier;

public KeepUntilGlobalCheckpointDeletionPolicy(LongSupplier globalCheckpointSupplier) {
this.globalCheckpointSupplier = globalCheckpointSupplier;
}

@Override
public void onInit(List<? extends IndexCommit> commits) throws IOException {
if (commits.isEmpty() == false) {
onCommit(commits);
}
}

@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
assert commits.isEmpty() == false;
final int keptIndex = indexOfKeptCommits(commits);
for (int i = 0; i < keptIndex; i++) {
commits.get(i).delete();
}
assert commits.get(commits.size() - 1).isDeleted() == false : "The last commit must not be deleted";
}

private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOException {
final long currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
// Commits are sorted by age (the 0th one is the oldest commit).
for (int i = commits.size() - 1; i >= 0; i--) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep this simple and just try to find the offset we need to delete from and then exit the loop. Then do the delete in a second loop outside of it. It would make it much simpler to read. Also I think we can safely iterate from 0 to N for simplicity. This is not perf critical in that place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 9ee44d2

final Map<String, String> commitUserData = commits.get(i).getUserData();
// Index from 5.x does not contain MAX_SEQ_NO, we should keep either the more recent commit with MAX_SEQ_NO,
// or the last commit.
if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is correct? we shouldn't we keep the commit with no seq# info? this means it was done before sequence numbers were done and implicitly doesn't have ops above the global checkpoint? this also means we can just return i?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both Math.min(i + 1, commits.size() - 1) and i are correct. Returning i is much simpler, but Math.min(i + 1, commits.size() - 1) will clean up unneeded commits sooner. It can be explained as follows. We have one commit (c1) without max_seq_no (from 5.x), then we have a new commit (c2) with max_seq_no. We don't need to keep the former commit (c1) if we keep c2. Returning i will keep both commits, but Math.min(i + 1, commits.size() - 1) will keep only c2.

However, I believe that I over-thinked about it. I pushed 3d5d323 to remove this optimization.

return Math.min(i + 1, commits.size() - 1);
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (maxSeqNoFromCommit <= currentGlobalCheckpoint) {
return i;
}
}
return -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should hunt down when this is possible - I can't think of a case where an existing (i.e. was fully initialized and it's translog committed) has this trait. A peer recovery index (where we create the translog) may have unknown GCP (and a single commit point which we can assert on). Instead of blindly accepting the fact that we have found no commit point, can we maybe rely on the fact that the GCP is UNASSIGNED_SEQ_NO and otherwise throw an exception? Re empty indices - I'm not sure about the initialization order - we should check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not possible before but can happen with a new limit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may happen when we upgrade from the previous 6.x versions. In the previous 6.x, we keep only the last commit - the max_seq_no of this commit is likely greater than the global checkpoint if indexing operations are in progress. Therefore, after upgrading to this version, we may not find a proper commit (eg. whose max_seq_no is less or equal to the current global checkpoint) with an old index until we reserve proper commits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Can we assert that's the case? i.e. give the deletion policy the index creation version and assert that the index was created before 6.x and that the commit has MAX_SEQ_NO in it? also please add a comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The also can happen in peer-recovery. If the file-based happens, a replica will be received the latest commit from a primary. However, that commit may not be a safe commit if writes are in progress.

I've documented these two cases. I think we need to discuss on the assertion.

}

}
Loading