Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public final class IndexSettings {
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB),
/*
* An empty translog occupies 43 bytes on disk. If the flush threshold is below this, the flush thread
* An empty translog occupies 55 bytes on disk. If the flush threshold is below this, the flush thread
* can get stuck in an infinite loop as the shouldPeriodicallyFlush can still be true after flushing.
* However, small thresholds are useful for testing so we do not add a large lower bound here.
*/
Expand Down Expand Up @@ -220,7 +220,7 @@ public final class IndexSettings {
"index.translog.generation_threshold_size",
new ByteSizeValue(64, ByteSizeUnit.MB),
/*
* An empty translog occupies 43 bytes on disk. If the generation threshold is
* An empty translog occupies 55 bytes on disk. If the generation threshold is
* below this, the flush thread can get stuck in an infinite loop repeatedly
* rolling the generation as every new generation will already exceed the
* generation threshold. However, small thresholds are useful for testing so we
Expand Down
15 changes: 7 additions & 8 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1066,14 +1066,13 @@ public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long ve
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
}

public Index(Term uid, ParsedDocument doc) {
this(uid, doc, Versions.MATCH_ANY);
public Index(Term uid, long primaryTerm, ParsedDocument doc) {
this(uid, primaryTerm, doc, Versions.MATCH_ANY);
} // TEST ONLY

Index(Term uid, ParsedDocument doc, long version) {
// use a primary term of 2 to allow tests to reduce it to a valid >0 term
this(uid, doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 2, version, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime(), -1, false);
Index(Term uid, long primaryTerm, ParsedDocument doc, long version) {
this(uid, doc, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime(), -1, false);
} // TEST ONLY

public ParsedDocument parsedDoc() {
Expand Down Expand Up @@ -1147,8 +1146,8 @@ public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, lo
this.id = Objects.requireNonNull(id);
}

public Delete(String type, String id, Term uid) {
this(type, id, uid, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
public Delete(String type, String id, Term uid, long primaryTerm) {
this(type, id, uid, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
}

public Delete(Delete template, VersionType versionType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public final class EngineConfig {
@Nullable
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final LongSupplier primaryTermSupplier;

/**
* Index setting to change the low level lucene codec used for writing new segments.
Expand Down Expand Up @@ -125,7 +126,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier) {
LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) {
this.shardId = shardId;
this.allocationId = allocationId;
this.indexSettings = indexSettings;
Expand All @@ -152,6 +153,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
this.translogRecoveryRunner = translogRecoveryRunner;
this.circuitBreakerService = circuitBreakerService;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.primaryTermSupplier = primaryTermSupplier;
}

/**
Expand Down Expand Up @@ -354,4 +356,11 @@ public Sort getIndexSort() {
public CircuitBreakerService getCircuitBreakerService() {
return this.circuitBreakerService;
}

/**
* Returns a supplier that supplies the latest primary term value of the associated shard.
*/
public LongSupplier getPrimaryTermSupplier() {
return primaryTermSupplier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final String translogUUID = loadTranslogUUIDFromLastCommit();
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier);
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2154,7 +2154,7 @@ private EngineConfig newEngineConfig() {
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker);
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ public abstract class BaseTranslogReader implements Comparable<BaseTranslogReade
protected final long generation;
protected final FileChannel channel;
protected final Path path;
protected final long firstOperationOffset;
protected final TranslogHeader header;

public BaseTranslogReader(long generation, FileChannel channel, Path path, long firstOperationOffset) {
public BaseTranslogReader(long generation, FileChannel channel, Path path, TranslogHeader header) {
assert Translog.parseIdFromFileName(path) == generation : "generation mismatch. Path: " + Translog.parseIdFromFileName(path) + " but generation: " + generation;

this.generation = generation;
this.path = path;
this.channel = channel;
this.firstOperationOffset = firstOperationOffset;
this.header = header;
}

public long getGeneration() {
Expand All @@ -57,7 +57,14 @@ public long getGeneration() {
abstract Checkpoint getCheckpoint();

public final long getFirstOperationOffset() {
return firstOperationOffset;
return header.sizeInBytes();
}

/**
* Returns the primary term associated with this translog reader.
*/
public final long getPrimaryTerm() {
return header.getPrimaryTerm();
}

/** read the size of the op (i.e., number of bytes, including the op size) written at the given position */
Expand Down Expand Up @@ -100,7 +107,12 @@ protected final BufferedChecksumStreamInput checksummedStream(ByteBuffer reusabl
}

protected Translog.Operation read(BufferedChecksumStreamInput inStream) throws IOException {
return Translog.readOperation(inStream);
final Translog.Operation op = Translog.readOperation(inStream);
if (op.primaryTerm() > getPrimaryTerm() && getPrimaryTerm() != TranslogHeader.UNKNOWN_PRIMARY_TERM) {
throw new TranslogCorruptedException("Operation's term is newer than translog header term; " +
"operation term[" + op.primaryTerm() + "], translog header term [" + getPrimaryTerm() + "]");
}
return op;
}

/**
Expand Down
47 changes: 26 additions & 21 deletions server/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public static final String CHECKPOINT_FILE_NAME = "translog" + CHECKPOINT_SUFFIX;

static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$");
public static final int DEFAULT_HEADER_SIZE_IN_BYTES = TranslogWriter.getHeaderLength(UUIDs.randomBase64UUID());
public static final int DEFAULT_HEADER_SIZE_IN_BYTES = TranslogHeader.defaultSizeInBytes(UUIDs.randomBase64UUID());

// the list of translog readers is guaranteed to be in order of translog generation
private final List<TranslogReader> readers = new ArrayList<>();
Expand All @@ -122,6 +122,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final AtomicBoolean closed = new AtomicBoolean();
private final TranslogConfig config;
private final LongSupplier globalCheckpointSupplier;
private final LongSupplier primaryTermSupplier;
private final String translogUUID;
private final TranslogDeletionPolicy deletionPolicy;

Expand All @@ -140,10 +141,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
*/
public Translog(
final TranslogConfig config, final String translogUUID, TranslogDeletionPolicy deletionPolicy,
final LongSupplier globalCheckpointSupplier) throws IOException {
final LongSupplier globalCheckpointSupplier, final LongSupplier primaryTermSupplier) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add java docs for the term supplier ? also describe what the term means here - i.e., it is sampled when a generation is rolled and the translog will reject operations with a higher term until rolled again.

super(config.getShardId(), config.getIndexSettings());
this.config = config;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.primaryTermSupplier = primaryTermSupplier;
this.deletionPolicy = deletionPolicy;
this.translogUUID = translogUUID;
bigArrays = config.getBigArrays();
Expand All @@ -165,7 +167,7 @@ public Translog(
//
// For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that file exists
// if not we don't even try to clean it up and wait until we fail creating it
assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(translogUUID) : "unexpected translog file: [" + nextTranslogFile + "]";
assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogHeader.defaultSizeInBytes(translogUUID) : "unexpected translog file: [" + nextTranslogFile + "]";
if (Files.exists(currentCheckpointFile) // current checkpoint is already copied
&& Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning
logger.warn("deleted previously created, but not yet committed, next generation [{}]. This can happen due to a tragic exception when creating a new generation", nextTranslogFile.getFileName());
Expand Down Expand Up @@ -226,6 +228,9 @@ private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws
minGenerationToRecoverFrom + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive");
}
final TranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitCheckpointFileName(i))));
assert reader.getPrimaryTerm() <= primaryTermSupplier.getAsLong() :
"Primary terms go backwards; current term [" + primaryTermSupplier.getAsLong() + "]" +
"translog path [ " + committedTranslogFile + ", existing term [" + reader.getPrimaryTerm() + "]";
foundTranslogs.add(reader);
logger.debug("recovered local translog from checkpoint {}", checkpoint);
}
Expand Down Expand Up @@ -269,10 +274,6 @@ private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws
}

TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException {
return openReader(path, checkpoint, translogUUID);
}

private static TranslogReader openReader(Path path, Checkpoint checkpoint, String translogUUID) throws IOException {
FileChannel channel = FileChannel.open(path, StandardOpenOption.READ);
try {
assert Translog.parseIdFromFileName(path) == checkpoint.generation : "expected generation: " + Translog.parseIdFromFileName(path) + " but got: " + checkpoint.generation;
Expand Down Expand Up @@ -459,7 +460,7 @@ TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, lon
getChannelFactory(),
config.getBufferSize(),
initialMinTranslogGen, initialGlobalCheckpoint,
globalCheckpointSupplier, this::getMinFileGeneration);
globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong());
} catch (final IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
}
Expand Down Expand Up @@ -487,6 +488,10 @@ public Location add(final Operation operation) throws IOException {
final ReleasablePagedBytesReference bytes = out.bytes();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
if (operation.primaryTerm() > current.getPrimaryTerm()) {
throw new IllegalArgumentException("Operation term is newer than the current term;"
+ "current term[" + current.getPrimaryTerm() + "], operation term[" + operation + "]");
}
return current.add(bytes, operation.seqNo());
}
} catch (final AlreadyClosedException | IOException ex) {
Expand Down Expand Up @@ -993,17 +998,17 @@ public Index(Engine.Index index, Engine.IndexResult indexResult) {
this.autoGeneratedIdTimestamp = index.getAutoGeneratedIdTimestamp();
}

public Index(String type, String id, long seqNo, byte[] source) {
this(type, id, seqNo, Versions.MATCH_ANY, VersionType.INTERNAL, source, null, null, -1);
public Index(String type, String id, long seqNo, long primaryTerm, byte[] source) {
this(type, id, seqNo, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL, source, null, null, -1);
}

public Index(String type, String id, long seqNo, long version, VersionType versionType, byte[] source, String routing,
String parent, long autoGeneratedIdTimestamp) {
public Index(String type, String id, long seqNo, long primaryTerm, long version, VersionType versionType,
byte[] source, String routing, String parent, long autoGeneratedIdTimestamp) {
this.type = type;
this.id = id;
this.source = new BytesArray(source);
this.seqNo = seqNo;
this.primaryTerm = 0;
this.primaryTerm = primaryTerm;
this.version = version;
this.versionType = versionType;
this.routing = routing;
Expand Down Expand Up @@ -1188,8 +1193,8 @@ public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) {
}

/** utility for testing */
public Delete(String type, String id, long seqNo, Term uid) {
this(type, id, uid, seqNo, 0, Versions.MATCH_ANY, VersionType.INTERNAL);
public Delete(String type, String id, long seqNo, long primaryTerm, Term uid) {
this(type, id, uid, seqNo, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL);
}

public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) {
Expand Down Expand Up @@ -1393,10 +1398,10 @@ public enum Durability {

}

private static void verifyChecksum(BufferedChecksumStreamInput in) throws IOException {
static void verifyChecksum(BufferedChecksumStreamInput in) throws IOException {
// This absolutely must come first, or else reading the checksum becomes part of the checksum
long expectedChecksum = in.getChecksum();
long readChecksum = in.readInt() & 0xFFFF_FFFFL;
long readChecksum = Integer.toUnsignedLong(in.readInt());
if (readChecksum != expectedChecksum) {
throw new TranslogCorruptedException("translog stream is corrupted, expected: 0x" +
Long.toHexString(expectedChecksum) + ", got: 0x" + Long.toHexString(readChecksum));
Expand Down Expand Up @@ -1679,10 +1684,10 @@ static Checkpoint readCheckpoint(final Path location) throws IOException {
*/
public static long readGlobalCheckpoint(final Path location, final String expectedTranslogUUID) throws IOException {
final Checkpoint checkpoint = readCheckpoint(location);
// We need to open at least translog reader to validate the translogUUID.
// We need to open at least one translog header to validate the translogUUID.
final Path translogFile = location.resolve(getFilename(checkpoint.generation));
try (TranslogReader reader = openReader(translogFile, checkpoint, expectedTranslogUUID)) {

try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) {
TranslogHeader.read(expectedTranslogUUID, translogFile, channel);
} catch (TranslogCorruptedException ex) {
throw ex; // just bubble up.
} catch (Exception ex) {
Expand Down Expand Up @@ -1723,7 +1728,7 @@ static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, S
final String translogUUID = UUIDs.randomBase64UUID();
TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1, location.resolve(getFilename(1)), channelFactory,
new ByteSizeValue(10), 1, initialGlobalCheckpoint,
() -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }
() -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, 0L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we make people pass the term to this method? If it's difficult, it's not a big deal but I feel it will be good to have the right term on the file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed ad5a212

);
writer.close();
return translogUUID;
Expand Down
Loading