Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.translog;

import java.util.concurrent.atomic.AtomicReference;

public class TragicExceptionHolder {
private final AtomicReference<Exception> tragedy = new AtomicReference<>();

public boolean setTragicException(Exception ex){
assert ex != null;
if (tragedy.compareAndSet(null, ex)) {
tragedy.set(ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why double-set here? Isn't the previous line already doing this?

return true;
} else {
if (tragedy.get() != ex) { //to ensure there is no self-suppression
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add space after //

tragedy.get().addSuppressed(ex);
}
return false;
}
}

public Exception get() {
return tragedy.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.function.LongSupplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -117,6 +118,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final Path location;
private TranslogWriter current;

protected final TragicExceptionHolder tragedy = new TragicExceptionHolder();
private final AtomicBoolean closed = new AtomicBoolean();
private final TranslogConfig config;
private final LongSupplier globalCheckpointSupplier;
Expand Down Expand Up @@ -310,8 +312,28 @@ public boolean isOpen() {
return closed.get() == false;
}

private static boolean calledFromOutsideOrViaTragedyClose() {
List<StackTraceElement> frames = Stream.of(Thread.currentThread().getStackTrace()).
skip(3). //skip getStackTrace, current method and close method frames
limit(10). //limit depth of analysis to 10 frames, it should be enough to catch closing with, e.g. IOUtils
filter(f ->
{
try {
return Translog.class.isAssignableFrom(Class.forName(f.getClassName()));
} catch (Exception ignored) {
return false;
}
}
). //find all inner callers including Translog subclasses
collect(Collectors.toList());
//the list of inner callers should be either empty or should contain closeOnTragicEvent method
return frames.isEmpty() || frames.stream().anyMatch(f -> f.getMethodName().equals("closeOnTragicEvent"));
}

@Override
public void close() throws IOException {
assert calledFromOutsideOrViaTragedyClose() :
"Translog.close method is called from inside Translog, but not via closeOnTragicEvent method";
if (closed.compareAndSet(false, true)) {
try (ReleasableLock lock = writeLock.acquire()) {
try {
Expand Down Expand Up @@ -462,7 +484,7 @@ TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, lon
getChannelFactory(),
config.getBufferSize(),
initialMinTranslogGen, initialGlobalCheckpoint,
globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong());
globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong(), tragedy);
} catch (final IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
}
Expand Down Expand Up @@ -726,7 +748,8 @@ public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException {
}
} catch (IOException e) {
IOUtils.closeWhileHandlingException(newReaders);
close();
tragedy.setTragicException(e);
closeOnTragicEvent(e);
throw e;
}

Expand Down Expand Up @@ -779,10 +802,10 @@ public boolean ensureSynced(Stream<Location> locations) throws IOException {
*
* @param ex if an exception occurs closing the translog, it will be suppressed into the provided exception
*/
private void closeOnTragicEvent(final Exception ex) {
protected void closeOnTragicEvent(final Exception ex) {
// we can not hold a read lock here because closing will attempt to obtain a write lock and that would result in self-deadlock
assert readLock.isHeldByCurrentThread() == false : Thread.currentThread().getName();
if (current.getTragicException() != null) {
if (tragedy.get() != null) {
try {
close();
} catch (final AlreadyClosedException inner) {
Expand Down Expand Up @@ -1556,7 +1579,8 @@ public void rollGeneration() throws IOException {
current = createWriter(current.getGeneration() + 1);
logger.trace("current translog set to [{}]", current.getGeneration());
} catch (final Exception e) {
IOUtils.closeWhileHandlingException(this); // tragic event
tragedy.setTragicException(e);
closeOnTragicEvent(e);
throw e;
}
}
Expand Down Expand Up @@ -1669,7 +1693,7 @@ long getFirstOperationPosition() { // for testing

private void ensureOpen() {
if (closed.get()) {
throw new AlreadyClosedException("translog is already closed", current.getTragicException());
throw new AlreadyClosedException("translog is already closed", tragedy.get());
}
}

Expand All @@ -1683,7 +1707,7 @@ ChannelFactory getChannelFactory() {
* Otherwise (no tragic exception has occurred) it returns null.
*/
public Exception getTragicException() {
return current.getTragicException();
return tragedy.get();
}

/** Reads and returns the current checkpoint */
Expand Down Expand Up @@ -1766,8 +1790,8 @@ static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, S
final String translogUUID = UUIDs.randomBase64UUID();
TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1, location.resolve(getFilename(1)), channelFactory,
new ByteSizeValue(10), 1, initialGlobalCheckpoint,
() -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm
);
() -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm,
new TragicExceptionHolder());
writer.close();
return translogUUID;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
/* the number of translog operations written to this file */
private volatile int operationCounter;
/* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */
private volatile Exception tragedy;
private TragicExceptionHolder tragedy;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this final

/* A buffered outputstream what writes to the writers channel */
private final OutputStream outputStream;
/* the total offset of this file including the bytes written to the file as well as into the buffer */
Expand All @@ -76,7 +76,10 @@ private TranslogWriter(
final FileChannel channel,
final Path path,
final ByteSizeValue bufferSize,
final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header) throws IOException {
final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header,
TragicExceptionHolder tragedy)
throws
IOException {
super(initialCheckpoint.generation, channel, path, header);
assert initialCheckpoint.offset == channel.position() :
"initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel position ["
Expand All @@ -94,12 +97,13 @@ private TranslogWriter(
assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null;
this.tragedy = tragedy;
}

public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory,
ByteSizeValue bufferSize, final long initialMinTranslogGen, long initialGlobalCheckpoint,
final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier,
final long primaryTerm)
final long primaryTerm, TragicExceptionHolder tragedy)
throws IOException {
final FileChannel channel = channelFactory.open(file);
try {
Expand All @@ -120,7 +124,7 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f
writerGlobalCheckpointSupplier = globalCheckpointSupplier;
}
return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize,
writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header);
writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy);
} catch (Exception exception) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
// file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition
Expand All @@ -129,28 +133,13 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f
}
}

/**
* If this {@code TranslogWriter} was closed as a side-effect of a tragic exception,
* e.g. disk full while flushing a new segment, this returns the root cause exception.
* Otherwise (no tragic exception has occurred) it returns null.
*/
public Exception getTragicException() {
return tragedy;
}

private synchronized void closeWithTragicEvent(final Exception ex) {
assert ex != null;
if (tragedy == null) {
tragedy = ex;
} else if (tragedy != ex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this case got lost

// it should be safe to call closeWithTragicEvents on multiple layers without
// worrying about self suppression.
tragedy.addSuppressed(ex);
}
try {
close();
} catch (final IOException | RuntimeException e) {
ex.addSuppressed(e);
if (tragedy.setTragicException(ex)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change the semantics here to only call close when setting the tragedy the first time? Let's keep the existing semantics and make setTragicException return void

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ywelsch what is the reason to call close twice if every time we set tragic exception close is called?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simpler as it decouples these two things. And no need for having this method return a boolean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ywelsch ok, it makes sense. There is already "closed" atomic that will prevent multiple closes.

try {
close();
} catch (final IOException | RuntimeException e) {
ex.addSuppressed(e);
}
}
}

Expand Down Expand Up @@ -296,7 +285,8 @@ public TranslogReader closeIntoReader() throws IOException {
if (closed.compareAndSet(false, true)) {
return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header);
} else {
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy);
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]",
tragedy.get());
}
}
}
Expand Down Expand Up @@ -406,7 +396,7 @@ Checkpoint getLastSyncedCheckpoint() {

protected final void ensureOpen() {
if (isClosed()) {
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy);
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private Tuple<List<TranslogReader>, TranslogWriter> createReadersAndWriter(final
}
writer = TranslogWriter.create(new ShardId("index", "uuid", 0), translogUUID, gen,
tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, 1L, 1L, () -> 1L,
() -> 1L, randomNonNegativeLong());
() -> 1L, randomNonNegativeLong(), new TragicExceptionHolder());
writer = Mockito.spy(writer);
Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
Expand Down Expand Up @@ -1655,7 +1656,7 @@ public void testRandomExceptionsOnTrimOperations( ) throws Exception {
}

assertThat(expectedException, is(not(nullValue())));

assertThat(failableTLog.getTragicException(), equalTo(expectedException));
assertThat(fileChannels, is(not(empty())));
assertThat("all file channels have to be closed",
fileChannels.stream().filter(f -> f.isOpen()).findFirst().isPresent(), is(false));
Expand Down Expand Up @@ -2508,8 +2509,10 @@ public void testWithRandomException() throws IOException {
// fair enough
} catch (IOException ex) {
assertEquals(ex.getMessage(), "__FAKE__ no space left on device");
assertEquals(failableTLog.getTragicException(), ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the case where we catch TranslogException | MockDirectoryWrapper.FakeIOException?

} catch (RuntimeException ex) {
assertEquals(ex.getMessage(), "simulated");
assertEquals(failableTLog.getTragicException(), ex);
} finally {
Checkpoint checkpoint = Translog.readCheckpoint(config.getTranslogPath());
if (checkpoint.numOps == unsynced.size() + syncedDocs.size()) {
Expand Down Expand Up @@ -2931,6 +2934,46 @@ public void testCloseSnapshotTwice() throws Exception {
}
}

public void testTranslogCloseInvariant() throws IOException {
//close method should never be called directly from Translog (the only exception is closeOnTragicEvent)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a space behind //

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also move this comment outside of the method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finally, add an assumeTrue("test only works with assertions enabled", Assertions.ENABLED); to the beginning of the method

class MisbehavingTranslog extends Translog {
MisbehavingTranslog(TranslogConfig config, String translogUUID, TranslogDeletionPolicy deletionPolicy, LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier);
}

void callCloseDirectly() throws IOException {
close();
}

void callCloseUsingIOUtilsWithExceptionHandling() {
IOUtils.closeWhileHandlingException(this);
}

void callCloseUsingIOUtils() throws IOException {
IOUtils.close(this);
}

void callCloseOnTragicEvent() {
Exception e = new Exception("test tragic exception");
tragedy.setTragicException(e);
closeOnTragicEvent(e);
}
}


globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Path path = createTempDir();
final TranslogConfig translogConfig = getTranslogConfig(path);
final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings());
final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
MisbehavingTranslog misbehavingTranslog = new MisbehavingTranslog(translogConfig, translogUUID, deletionPolicy, () -> globalCheckpoint.get(), primaryTerm::get);

expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseDirectly());
expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseUsingIOUtils());
expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseUsingIOUtilsWithExceptionHandling());
misbehavingTranslog.callCloseOnTragicEvent();
}

static class SortedSnapshot implements Translog.Snapshot {
private final Translog.Snapshot snapshot;
private List<Translog.Operation> operations = null;
Expand Down