From c330ec8cc71ded0789f6d96786a13047f7d95996 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Mon, 6 Aug 2018 19:46:16 +0200 Subject: [PATCH 01/12] Move tragedy exception from TranslogWriter to Translog --- .../index/translog/Translog.java | 23 ++++++---- .../index/translog/TranslogWriter.java | 44 ++++++++----------- .../translog/TranslogDeletionPolicyTests.java | 3 +- 3 files changed, 34 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 04744bc68c492..027dc0ababfcc 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -61,6 +61,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.LongSupplier; @@ -117,6 +118,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final Path location; private TranslogWriter current; + private final AtomicReference tragedy = new AtomicReference<>(); private final AtomicBoolean closed = new AtomicBoolean(); private final TranslogConfig config; private final LongSupplier globalCheckpointSupplier; @@ -462,7 +464,7 @@ TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, lon getChannelFactory(), config.getBufferSize(), initialMinTranslogGen, initialGlobalCheckpoint, - globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong()); + globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong(), tragedy); } catch (final IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } @@ -725,8 +727,9 @@ public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException { newReaders.add(newReader); } } catch (IOException e) { - IOUtils.closeWhileHandlingException(newReaders); - close(); + if (tragedy.compareAndSet(null, e)) { + closeOnTragicEvent(e); + } throw e; } @@ -782,7 +785,7 @@ public boolean ensureSynced(Stream locations) throws IOException { private void closeOnTragicEvent(final Exception ex) { // we can not hold a read lock here because closing will attempt to obtain a write lock and that would result in self-deadlock assert readLock.isHeldByCurrentThread() == false : Thread.currentThread().getName(); - if (current.getTragicException() != null) { + if (tragedy.get() != null) { try { close(); } catch (final AlreadyClosedException inner) { @@ -1558,7 +1561,9 @@ public void rollGeneration() throws IOException { current = createWriter(current.getGeneration() + 1); logger.trace("current translog set to [{}]", current.getGeneration()); } catch (final Exception e) { - IOUtils.closeWhileHandlingException(this); // tragic event + if (tragedy.compareAndSet(null, e)) { + closeOnTragicEvent(e); + } throw e; } } @@ -1671,7 +1676,7 @@ long getFirstOperationPosition() { // for testing private void ensureOpen() { if (closed.get()) { - throw new AlreadyClosedException("translog is already closed", current.getTragicException()); + throw new AlreadyClosedException("translog is already closed", tragedy.get()); } } @@ -1685,7 +1690,7 @@ ChannelFactory getChannelFactory() { * Otherwise (no tragic exception has occurred) it returns null. */ public Exception getTragicException() { - return current.getTragicException(); + return tragedy.get(); } /** Reads and returns the current checkpoint */ @@ -1768,8 +1773,8 @@ static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, S final String translogUUID = UUIDs.randomBase64UUID(); TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1, location.resolve(getFilename(1)), channelFactory, new ByteSizeValue(10), 1, initialGlobalCheckpoint, - () -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm - ); + () -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm, + new AtomicReference<>()); writer.close(); return translogUUID; } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index c135facc67f5f..623df3692f85b 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -41,6 +41,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; public class TranslogWriter extends BaseTranslogReader implements Closeable { @@ -51,7 +52,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { /* the number of translog operations written to this file */ private volatile int operationCounter; /* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */ - private volatile Exception tragedy; + private AtomicReference tragedy; /* A buffered outputstream what writes to the writers channel */ private final OutputStream outputStream; /* the total offset of this file including the bytes written to the file as well as into the buffer */ @@ -76,7 +77,10 @@ private TranslogWriter( final FileChannel channel, final Path path, final ByteSizeValue bufferSize, - final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header) throws IOException { + final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header, + AtomicReference tragedy) + throws + IOException { super(initialCheckpoint.generation, channel, path, header); assert initialCheckpoint.offset == channel.position() : "initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel position [" @@ -94,12 +98,13 @@ private TranslogWriter( assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo; this.globalCheckpointSupplier = globalCheckpointSupplier; this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null; + this.tragedy = tragedy; } public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize, final long initialMinTranslogGen, long initialGlobalCheckpoint, final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier, - final long primaryTerm) + final long primaryTerm, AtomicReference tragedy) throws IOException { final FileChannel channel = channelFactory.open(file); try { @@ -120,7 +125,7 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f writerGlobalCheckpointSupplier = globalCheckpointSupplier; } return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, - writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header); + writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition @@ -129,28 +134,14 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f } } - /** - * If this {@code TranslogWriter} was closed as a side-effect of a tragic exception, - * e.g. disk full while flushing a new segment, this returns the root cause exception. - * Otherwise (no tragic exception has occurred) it returns null. - */ - public Exception getTragicException() { - return tragedy; - } - private synchronized void closeWithTragicEvent(final Exception ex) { assert ex != null; - if (tragedy == null) { - tragedy = ex; - } else if (tragedy != ex) { - // it should be safe to call closeWithTragicEvents on multiple layers without - // worrying about self suppression. - tragedy.addSuppressed(ex); - } - try { - close(); - } catch (final IOException | RuntimeException e) { - ex.addSuppressed(e); + if (tragedy.compareAndSet(null, ex)) { + try { + close(); + } catch (final IOException | RuntimeException e) { + ex.addSuppressed(e); + } } } @@ -293,7 +284,8 @@ public TranslogReader closeIntoReader() throws IOException { if (closed.compareAndSet(false, true)) { return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header); } else { - throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy); + throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", + tragedy.get()); } } } @@ -403,7 +395,7 @@ Checkpoint getLastSyncedCheckpoint() { protected final void ensureOpen() { if (isClosed()) { - throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy); + throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy.get()); } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 9ae502fecb580..79cc095cd8441 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -35,6 +35,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; @@ -171,7 +172,7 @@ private Tuple, TranslogWriter> createReadersAndWriter(final } writer = TranslogWriter.create(new ShardId("index", "uuid", 0), translogUUID, gen, tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, 1L, 1L, () -> 1L, - () -> 1L, randomNonNegativeLong()); + () -> 1L, randomNonNegativeLong(), new AtomicReference<>()); writer = Mockito.spy(writer); Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); From bc7187ad0f62b6952169bd9bbadb8ba158e8ce62 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Mon, 6 Aug 2018 20:27:12 +0200 Subject: [PATCH 02/12] Close new readers --- .../src/main/java/org/elasticsearch/index/translog/Translog.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 027dc0ababfcc..cc17ba8d192d9 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -727,6 +727,7 @@ public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException { newReaders.add(newReader); } } catch (IOException e) { + IOUtils.closeWhileHandlingException(newReaders); if (tragedy.compareAndSet(null, e)) { closeOnTragicEvent(e); } From 7d1d55099cd6e3ab507869bb6271b3593906b72d Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Tue, 7 Aug 2018 09:36:56 +0200 Subject: [PATCH 03/12] Call closeOnTragicEvent regardless of CAS result --- .../org/elasticsearch/index/translog/Translog.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index cc17ba8d192d9..0e17e837eb45e 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -728,9 +728,8 @@ public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException { } } catch (IOException e) { IOUtils.closeWhileHandlingException(newReaders); - if (tragedy.compareAndSet(null, e)) { - closeOnTragicEvent(e); - } + tragedy.compareAndSet(null, e); + closeOnTragicEvent(e); throw e; } @@ -1562,9 +1561,8 @@ public void rollGeneration() throws IOException { current = createWriter(current.getGeneration() + 1); logger.trace("current translog set to [{}]", current.getGeneration()); } catch (final Exception e) { - if (tragedy.compareAndSet(null, e)) { - closeOnTragicEvent(e); - } + tragedy.compareAndSet(null, e); + closeOnTragicEvent(e); throw e; } } From e01f31a57f5214cd6b2c54954658fca8202f2786 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Tue, 7 Aug 2018 10:51:45 +0200 Subject: [PATCH 04/12] Assert that close is not called from inside Translog --- .../org/elasticsearch/index/translog/Translog.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 0e17e837eb45e..6e24314580952 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -67,6 +67,7 @@ import java.util.function.LongSupplier; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -312,8 +313,21 @@ public boolean isOpen() { return closed.get() == false; } + private boolean calledFromOutsideOrViaTragedyClose(){ + List frames = Stream.of(Thread.currentThread().getStackTrace()). + skip(3). //skip getStackTrace, current method and close method frames + limit(10). //limit depth of analysis to 10 frames, it should be enough to catch closing with, e.g. IOUtils + filter(f -> f.getClassName().equals(Translog.class.getName())). //find all inner callers + collect(Collectors.toList()); + + //the list of inner callers should be either empty or should contain closeOnTragicEvent method + return frames.isEmpty() || frames.stream().anyMatch(f -> f.getMethodName().equals("closeOnTragicEvent")); + } + @Override public void close() throws IOException { + assert calledFromOutsideOrViaTragedyClose() : + "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method"; if (closed.compareAndSet(false, true)) { try (ReleasableLock lock = writeLock.acquire()) { try { From 75ba1672747caa5603956c545032b1b89631a15f Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Fri, 17 Aug 2018 13:26:02 +0200 Subject: [PATCH 05/12] Fix some code review comments --- .../main/java/org/elasticsearch/index/translog/Translog.java | 2 +- .../java/org/elasticsearch/index/translog/TranslogWriter.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 6e24314580952..2d691e985b562 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -313,7 +313,7 @@ public boolean isOpen() { return closed.get() == false; } - private boolean calledFromOutsideOrViaTragedyClose(){ + private static boolean calledFromOutsideOrViaTragedyClose(){ List frames = Stream.of(Thread.currentThread().getStackTrace()). skip(3). //skip getStackTrace, current method and close method frames limit(10). //limit depth of analysis to 10 frames, it should be enough to catch closing with, e.g. IOUtils diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 623df3692f85b..1dbb2cb19caec 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -142,6 +142,8 @@ private synchronized void closeWithTragicEvent(final Exception ex) { } catch (final IOException | RuntimeException e) { ex.addSuppressed(e); } + } else { + tragedy.get().addSuppressed(ex); } } From b76b29576292b8e392871fcdb0c487164428c87e Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Fri, 17 Aug 2018 14:43:19 +0200 Subject: [PATCH 06/12] Fix some code review comments --- .../index/translog/TragicExceptionHolder.java | 43 +++++++++++++++++++ .../index/translog/Translog.java | 8 ++-- .../index/translog/TranslogWriter.java | 11 ++--- .../translog/TranslogDeletionPolicyTests.java | 2 +- .../index/translog/TranslogTests.java | 4 +- 5 files changed, 55 insertions(+), 13 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java diff --git a/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java b/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java new file mode 100644 index 0000000000000..902ff5df5a26b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import java.util.concurrent.atomic.AtomicReference; + +public class TragicExceptionHolder { + private final AtomicReference tragedy = new AtomicReference<>(); + + public boolean setTragicException(Exception ex){ + assert ex != null; + if (tragedy.compareAndSet(null, ex)) { + tragedy.set(ex); + return true; + } else { + if (tragedy.get() != ex) { //to ensure there is no self-suppression + tragedy.get().addSuppressed(ex); + } + return false; + } + } + + public Exception get() { + return tragedy.get(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index a8c35bdca6852..955b847c84c00 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -119,7 +119,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final Path location; private TranslogWriter current; - private final AtomicReference tragedy = new AtomicReference<>(); + private final TragicExceptionHolder tragedy = new TragicExceptionHolder(); private final AtomicBoolean closed = new AtomicBoolean(); private final TranslogConfig config; private final LongSupplier globalCheckpointSupplier; @@ -742,7 +742,7 @@ public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException { } } catch (IOException e) { IOUtils.closeWhileHandlingException(newReaders); - tragedy.compareAndSet(null, e); + tragedy.setTragicException(e); closeOnTragicEvent(e); throw e; } @@ -1573,7 +1573,7 @@ public void rollGeneration() throws IOException { current = createWriter(current.getGeneration() + 1); logger.trace("current translog set to [{}]", current.getGeneration()); } catch (final Exception e) { - tragedy.compareAndSet(null, e); + tragedy.setTragicException(e); closeOnTragicEvent(e); throw e; } @@ -1785,7 +1785,7 @@ static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, S TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1, location.resolve(getFilename(1)), channelFactory, new ByteSizeValue(10), 1, initialGlobalCheckpoint, () -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm, - new AtomicReference<>()); + new TragicExceptionHolder()); writer.close(); return translogUUID; } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index f8f3ebf0321f7..12b47b2366dad 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -52,7 +52,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { /* the number of translog operations written to this file */ private volatile int operationCounter; /* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */ - private AtomicReference tragedy; + private TragicExceptionHolder tragedy; /* A buffered outputstream what writes to the writers channel */ private final OutputStream outputStream; /* the total offset of this file including the bytes written to the file as well as into the buffer */ @@ -78,7 +78,7 @@ private TranslogWriter( final Path path, final ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header, - AtomicReference tragedy) + TragicExceptionHolder tragedy) throws IOException { super(initialCheckpoint.generation, channel, path, header); @@ -104,7 +104,7 @@ private TranslogWriter( public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize, final long initialMinTranslogGen, long initialGlobalCheckpoint, final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier, - final long primaryTerm, AtomicReference tragedy) + final long primaryTerm, TragicExceptionHolder tragedy) throws IOException { final FileChannel channel = channelFactory.open(file); try { @@ -135,15 +135,12 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f } private synchronized void closeWithTragicEvent(final Exception ex) { - assert ex != null; - if (tragedy.compareAndSet(null, ex)) { + if (tragedy.setTragicException(ex)) { try { close(); } catch (final IOException | RuntimeException e) { ex.addSuppressed(e); } - } else { - tragedy.get().addSuppressed(ex); } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 79cc095cd8441..0b8652dc48e77 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -172,7 +172,7 @@ private Tuple, TranslogWriter> createReadersAndWriter(final } writer = TranslogWriter.create(new ShardId("index", "uuid", 0), translogUUID, gen, tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, 1L, 1L, () -> 1L, - () -> 1L, randomNonNegativeLong(), new AtomicReference<>()); + () -> 1L, randomNonNegativeLong(), new TragicExceptionHolder()); writer = Mockito.spy(writer); Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 1c27a59e0ecbe..6e2418ec92033 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -1655,7 +1655,7 @@ public void testRandomExceptionsOnTrimOperations( ) throws Exception { } assertThat(expectedException, is(not(nullValue()))); - + assertThat(failableTLog.getTragicException(), equalTo(expectedException)); assertThat(fileChannels, is(not(empty()))); assertThat("all file channels have to be closed", fileChannels.stream().filter(f -> f.isOpen()).findFirst().isPresent(), is(false)); @@ -2508,8 +2508,10 @@ public void testWithRandomException() throws IOException { // fair enough } catch (IOException ex) { assertEquals(ex.getMessage(), "__FAKE__ no space left on device"); + assertEquals(failableTLog.getTragicException(), ex); } catch (RuntimeException ex) { assertEquals(ex.getMessage(), "simulated"); + assertEquals(failableTLog.getTragicException(), ex); } finally { Checkpoint checkpoint = Translog.readCheckpoint(config.getTranslogPath()); if (checkpoint.numOps == unsynced.size() + syncedDocs.size()) { From 339aaeb587352752663f59d1bd18a13ddc9332ab Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Fri, 17 Aug 2018 15:44:42 +0200 Subject: [PATCH 07/12] Test for Translog.close invariant --- .../index/translog/Translog.java | 29 ++++++++----- .../index/translog/TranslogTests.java | 42 +++++++++++++++++++ 2 files changed, 60 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 955b847c84c00..37d8d0c5d09ad 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -119,7 +119,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final Path location; private TranslogWriter current; - private final TragicExceptionHolder tragedy = new TragicExceptionHolder(); + protected final TragicExceptionHolder tragedy = new TragicExceptionHolder(); private final AtomicBoolean closed = new AtomicBoolean(); private final TranslogConfig config; private final LongSupplier globalCheckpointSupplier; @@ -313,15 +313,22 @@ public boolean isOpen() { return closed.get() == false; } - private static boolean calledFromOutsideOrViaTragedyClose(){ - List frames = Stream.of(Thread.currentThread().getStackTrace()). - skip(3). //skip getStackTrace, current method and close method frames - limit(10). //limit depth of analysis to 10 frames, it should be enough to catch closing with, e.g. IOUtils - filter(f -> f.getClassName().equals(Translog.class.getName())). //find all inner callers - collect(Collectors.toList()); - - //the list of inner callers should be either empty or should contain closeOnTragicEvent method - return frames.isEmpty() || frames.stream().anyMatch(f -> f.getMethodName().equals("closeOnTragicEvent")); + private static boolean calledFromOutsideOrViaTragedyClose() { + List frames = Stream.of(Thread.currentThread().getStackTrace()). + skip(3). //skip getStackTrace, current method and close method frames + limit(10). //limit depth of analysis to 10 frames, it should be enough to catch closing with, e.g. IOUtils + filter(f -> + { + try { + return Translog.class.isAssignableFrom(Class.forName(f.getClassName())); + } catch (Exception ignored) { + return false; + } + } + ). //find all inner callers including Translog subclasses + collect(Collectors.toList()); + //the list of inner callers should be either empty or should contain closeOnTragicEvent method + return frames.isEmpty() || frames.stream().anyMatch(f -> f.getMethodName().equals("closeOnTragicEvent")); } @Override @@ -796,7 +803,7 @@ public boolean ensureSynced(Stream locations) throws IOException { * * @param ex if an exception occurs closing the translog, it will be suppressed into the provided exception */ - private void closeOnTragicEvent(final Exception ex) { + protected void closeOnTragicEvent(final Exception ex) { // we can not hold a read lock here because closing will attempt to obtain a write lock and that would result in self-deadlock assert readLock.isHeldByCurrentThread() == false : Thread.currentThread().getName(); if (tragedy.get() != null) { diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 6e2418ec92033..e8b940832e6f2 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -70,6 +70,7 @@ import org.elasticsearch.index.translog.Translog.Location; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.test.rest.yaml.section.Assertion; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; @@ -108,6 +109,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -2933,6 +2935,46 @@ public void testCloseSnapshotTwice() throws Exception { } } + public void testTranslogCloseInvariant() throws IOException { + //close method should never be called directly from Translog (the only exception is closeOnTragicEvent) + class MisbehavingTranslog extends Translog { + public MisbehavingTranslog(TranslogConfig config, String translogUUID, TranslogDeletionPolicy deletionPolicy, LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) throws IOException { + super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier); + } + + void callCloseDirectly() throws IOException { + close(); + } + + void callCloseUsingIOUtilsWithExceptionHandling() { + IOUtils.closeWhileHandlingException(this); + } + + void callCloseUsingIOUtils() throws IOException { + IOUtils.close(this); + } + + void callCloseOnTragicEvent() { + Exception e = new Exception("test tragic exception"); + tragedy.setTragicException(e); + closeOnTragicEvent(e); + } + } + + + globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + Path path = createTempDir(); + final TranslogConfig translogConfig = getTranslogConfig(path); + final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); + final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); + MisbehavingTranslog misbehavingTranslog = new MisbehavingTranslog(translogConfig, translogUUID, deletionPolicy, () -> globalCheckpoint.get(), primaryTerm::get); + + expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseDirectly()); + expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseUsingIOUtils()); + expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseUsingIOUtilsWithExceptionHandling()); + misbehavingTranslog.callCloseOnTragicEvent(); + } + static class SortedSnapshot implements Translog.Snapshot { private final Translog.Snapshot snapshot; private List operations = null; From f42ad1d246479138510b3b16bae03ebbb83e5241 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Fri, 17 Aug 2018 15:58:35 +0200 Subject: [PATCH 08/12] Code style --- .../src/main/java/org/elasticsearch/index/translog/Translog.java | 1 - .../java/org/elasticsearch/index/translog/TranslogWriter.java | 1 - 2 files changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 37d8d0c5d09ad..72c6210535f9c 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -61,7 +61,6 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.LongSupplier; diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 12b47b2366dad..f9291ec55ec88 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -41,7 +41,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; public class TranslogWriter extends BaseTranslogReader implements Closeable { From b06f2493dbee4505a46c8dcd0090d85c000aa9e4 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Fri, 17 Aug 2018 16:23:02 +0200 Subject: [PATCH 09/12] Code style --- .../index/translog/TranslogDeletionPolicyTests.java | 1 - .../java/org/elasticsearch/index/translog/TranslogTests.java | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 0b8652dc48e77..c8d4dbd43df2f 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -35,7 +35,6 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index e8b940832e6f2..17f97ca7e19e6 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -70,7 +70,6 @@ import org.elasticsearch.index.translog.Translog.Location; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; -import org.elasticsearch.test.rest.yaml.section.Assertion; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; @@ -2938,7 +2937,7 @@ public void testCloseSnapshotTwice() throws Exception { public void testTranslogCloseInvariant() throws IOException { //close method should never be called directly from Translog (the only exception is closeOnTragicEvent) class MisbehavingTranslog extends Translog { - public MisbehavingTranslog(TranslogConfig config, String translogUUID, TranslogDeletionPolicy deletionPolicy, LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) throws IOException { + MisbehavingTranslog(TranslogConfig config, String translogUUID, TranslogDeletionPolicy deletionPolicy, LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier); } From 7fef4120cc3014d90879b66b7813c076b474491d Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Mon, 20 Aug 2018 17:04:07 +0200 Subject: [PATCH 10/12] Code review 2 fixes --- .../elasticsearch/index/translog/TragicExceptionHolder.java | 3 +-- .../org/elasticsearch/index/translog/TranslogWriter.java | 2 +- .../org/elasticsearch/index/translog/TranslogTests.java | 6 ++++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java b/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java index 902ff5df5a26b..eec346018da24 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java @@ -27,10 +27,9 @@ public class TragicExceptionHolder { public boolean setTragicException(Exception ex){ assert ex != null; if (tragedy.compareAndSet(null, ex)) { - tragedy.set(ex); return true; } else { - if (tragedy.get() != ex) { //to ensure there is no self-suppression + if (tragedy.get() != ex) { // to ensure there is no self-suppression tragedy.get().addSuppressed(ex); } return false; diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index f9291ec55ec88..8de04ea8c9225 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -51,7 +51,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { /* the number of translog operations written to this file */ private volatile int operationCounter; /* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */ - private TragicExceptionHolder tragedy; + private final TragicExceptionHolder tragedy; /* A buffered outputstream what writes to the writers channel */ private final OutputStream outputStream; /* the total offset of this file including the bytes written to the file as well as into the buffer */ diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 17f97ca7e19e6..4ec479334ba67 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -33,6 +33,7 @@ import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.LineFileDocs; import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.Assertions; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; @@ -2506,7 +2507,7 @@ public void testWithRandomException() throws IOException { syncedDocs.addAll(unsynced); unsynced.clear(); } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { - // fair enough + assertEquals(failableTLog.getTragicException(), ex); } catch (IOException ex) { assertEquals(ex.getMessage(), "__FAKE__ no space left on device"); assertEquals(failableTLog.getTragicException(), ex); @@ -2934,8 +2935,9 @@ public void testCloseSnapshotTwice() throws Exception { } } + // close method should never be called directly from Translog (the only exception is closeOnTragicEvent) public void testTranslogCloseInvariant() throws IOException { - //close method should never be called directly from Translog (the only exception is closeOnTragicEvent) + assumeTrue("test only works with assertions enabled", Assertions.ENABLED); class MisbehavingTranslog extends Translog { MisbehavingTranslog(TranslogConfig config, String translogUUID, TranslogDeletionPolicy deletionPolicy, LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier); From 87b521107cbd6a34b8bd076d47957c3914a81d62 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Mon, 20 Aug 2018 18:19:41 +0200 Subject: [PATCH 11/12] setTragicException returns void --- .../index/translog/TragicExceptionHolder.java | 11 ++++++----- .../elasticsearch/index/translog/TranslogWriter.java | 12 ++++++------ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java b/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java index eec346018da24..2f8bf59e44f4f 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java @@ -24,15 +24,16 @@ public class TragicExceptionHolder { private final AtomicReference tragedy = new AtomicReference<>(); - public boolean setTragicException(Exception ex){ + /** + * Sets the tragic exception or if the tragic exception is already set adds passed exception as suppressed exception + * @param ex tragic exception to set + */ + public void setTragicException(Exception ex){ assert ex != null; - if (tragedy.compareAndSet(null, ex)) { - return true; - } else { + if (tragedy.compareAndSet(null, ex) == false) { if (tragedy.get() != ex) { // to ensure there is no self-suppression tragedy.get().addSuppressed(ex); } - return false; } } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 8de04ea8c9225..78e026159b172 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -134,15 +134,15 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f } private synchronized void closeWithTragicEvent(final Exception ex) { - if (tragedy.setTragicException(ex)) { - try { - close(); - } catch (final IOException | RuntimeException e) { - ex.addSuppressed(e); - } + tragedy.setTragicException(ex); + try { + close(); + } catch (final IOException | RuntimeException e) { + ex.addSuppressed(e); } } + /** * add the given bytes to the translog and return the location they were written at */ From 6259de54092ca1568c34d4ca5ac2abb375770beb Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Mon, 20 Aug 2018 18:55:51 +0200 Subject: [PATCH 12/12] Code style --- .../org/elasticsearch/index/translog/TragicExceptionHolder.java | 2 +- .../java/org/elasticsearch/index/translog/TranslogWriter.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java b/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java index 2f8bf59e44f4f..b823a920039b5 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java @@ -28,7 +28,7 @@ public class TragicExceptionHolder { * Sets the tragic exception or if the tragic exception is already set adds passed exception as suppressed exception * @param ex tragic exception to set */ - public void setTragicException(Exception ex){ + public void setTragicException(Exception ex) { assert ex != null; if (tragedy.compareAndSet(null, ex) == false) { if (tragedy.get() != ex) { // to ensure there is no self-suppression diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 78e026159b172..f48f2ceb79278 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -142,7 +142,6 @@ private synchronized void closeWithTragicEvent(final Exception ex) { } } - /** * add the given bytes to the translog and return the location they were written at */