diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 00be66c5c0fd..4c864e5e4502 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -370,11 +370,9 @@ private void tryStartNewShipper(String walGroupId) { ReplicationSourceWALReader walReader = createNewWALReader(walGroupId, getStartOffset(walGroupId)); ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader); - Threads.setDaemonThreadRunning( - walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader." - + walGroupId + "," + queueId, - (t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); - worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); + Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() + + ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing); + worker.startup(this::retryRefreshing); return worker; } }); @@ -448,24 +446,30 @@ WALEntryFilter getWalEntryFilter() { return walEntryFilter; } - private void uncaughtException(Thread t, Throwable e, ReplicationSourceManager manager, - String peerId) { - OOMEChecker.exitIfOOME(e, getClass().getSimpleName()); - LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), e); + // log the error, check if the error is OOME, or whether we should abort the server + private void checkError(Thread t, Throwable error) { + OOMEChecker.exitIfOOME(error, getClass().getSimpleName()); + LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), error); if (abortOnError) { - server.abort("Unexpected exception in " + t.getName(), e); + server.abort("Unexpected exception in " + t.getName(), error); } - if (manager != null) { - while (true) { - try { - LOG.info("Refreshing replication sources now due to previous error on thread: {}", - t.getName()); - manager.refreshSources(peerId); - break; - } catch (IOException | ReplicationException e1) { - LOG.error("Replication sources refresh failed.", e1); - sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier); - } + } + + private void retryRefreshing(Thread t, Throwable error) { + checkError(t, error); + while (true) { + if (server.isAborted() || server.isStopped() || server.isStopping()) { + LOG.warn("Server is shutting down, give up refreshing source for peer {}", getPeerId()); + return; + } + try { + LOG.info("Refreshing replication sources now due to previous error on thread: {}", + t.getName()); + manager.refreshSources(getPeerId()); + break; + } catch (Exception e) { + LOG.error("Replication sources refresh failed.", e); + sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier); } } } @@ -630,7 +634,7 @@ public ReplicationSourceInterface startup() { // keep looping in this thread until initialize eventually succeeds, // while the server main startup one can go on with its work. sourceRunning = false; - uncaughtException(t, e, null, null); + checkError(t, e); retryStartup.set(!this.abortOnError); do { if (retryStartup.get()) { @@ -641,7 +645,7 @@ public ReplicationSourceInterface startup() { initialize(); } catch (Throwable error) { setSourceStartupStatus(false); - uncaughtException(t, error, null, null); + checkError(t, error); retryStartup.set(!this.abortOnError); } }