Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -62,6 +64,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* The default implementation of FSWAL.
Expand Down Expand Up @@ -115,6 +118,9 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private static final String MAX_BATCH_COUNT = "hbase.regionserver.wal.sync.batch.count";
private static final int DEFAULT_MAX_BATCH_COUNT = 200;

private static final String FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = "hbase.wal.fshlog.wait.on.shutdown.seconds";
private static final int DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;

/**
* The nexus at which all incoming handlers meet. Does appends and sync with an ordering. Appends
* and syncs are each put on the ring which means handlers need to smash up against the ring twice
Expand Down Expand Up @@ -160,6 +166,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {

private final AtomicInteger closeErrorCount = new AtomicInteger();

private final int waitOnShutdownInSeconds;
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());

/**
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
* using our logger instead of java native logger.
Expand Down Expand Up @@ -224,6 +234,8 @@ public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
CommonFSUtils.getDefaultReplication(fs, this.walDir));
this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT);
this.closeErrorsTolerated = conf.getInt(ROLL_ERRORS_TOLERATED, DEFAULT_ROLL_ERRORS_TOLERATED);
this.waitOnShutdownInSeconds = conf.getInt(FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS,
DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS);
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
// put on the ring buffer.
String hostingThreadName = Thread.currentThread().getName();
Expand Down Expand Up @@ -354,23 +366,22 @@ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) th
}
long oldFileLen = 0L;
// It is at the safe point. Swap out writer from under the blocked writer thread.
// TODO: This is close is inline with critical section. Should happen in background?
if (this.writer != null) {
oldFileLen = this.writer.getLength();
try {
TraceUtil.addTimelineAnnotation("closing writer");
this.writer.close();
TraceUtil.addTimelineAnnotation("writer closed");
this.closeErrorCount.set(0);
} catch (IOException ioe) {
int errors = closeErrorCount.incrementAndGet();
if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) {
LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" + ioe.getMessage()
+ "\", errors=" + errors
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK");
} else {
throw ioe;
}
// In case of having unflushed entries or we already reached the
// closeErrorsTolerated count, call the closeWriter inline rather than in async
// way so that in case of an IOE we will throw it back and abort RS.
if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) {
closeWriter(this.writer, oldPath, true);
} else {
Writer localWriter = this.writer;
closeExecutor.execute(() -> {
try {
closeWriter(localWriter, oldPath, false);
} catch (IOException e) {
// We will never reach here.
}
});
}
}
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
Expand Down Expand Up @@ -412,6 +423,24 @@ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) th
}
}

private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws IOException {
try {
TraceUtil.addTimelineAnnotation("closing writer");
writer.close();
TraceUtil.addTimelineAnnotation("writer closed");
} catch (IOException ioe) {
int errors = closeErrorCount.incrementAndGet();
boolean hasUnflushedEntries = isUnflushedEntries();
if (syncCloseCall && (hasUnflushedEntries || (errors > this.closeErrorsTolerated))) {
LOG.error("Close of WAL " + path + " failed. Cause=\"" + ioe.getMessage() + "\", errors="
+ errors + ", hasUnflushedEntries=" + hasUnflushedEntries);
throw ioe;
}
LOG.warn("Riding over failed WAL close of " + path
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
}
}

@Override
protected void doShutdown() throws IOException {
// Shutdown the disruptor. Will stop after all entries have been processed. Make sure we
Expand All @@ -436,6 +465,18 @@ protected void doShutdown() throws IOException {
this.writer.close();
this.writer = null;
}
closeExecutor.shutdown();
try {
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
LOG.error("We have waited {} seconds but the close of writer(s) doesn't complete."
+ "Please check the status of underlying filesystem"
+ " or increase the wait time by the config \"{}\"", this.waitOnShutdownInSeconds,
FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS);
}
} catch (InterruptedException e) {
LOG.error("The wait for termination of FSHLog writer(s) is interrupted");
Thread.currentThread().interrupt();
}
}

@Override
Expand Down