diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 2973db521bd1..74816074c783 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -461,15 +461,16 @@ public void refreshSources(String peerId) throws IOException { String terminateMessage = "Peer " + peerId + " state or config changed. Will close the previous replication source and open a new one"; ReplicationPeer peer = replicationPeers.getPeer(peerId); - ReplicationSourceInterface src = createSource(peerId, peer); + ReplicationSourceInterface src; // synchronized on latestPaths to avoid missing the new log synchronized (this.latestPaths) { - ReplicationSourceInterface toRemove = this.sources.put(peerId, src); + ReplicationSourceInterface toRemove = this.sources.remove(peerId); if (toRemove != null) { LOG.info("Terminate replication source for " + toRemove.getPeerId()); - // Do not clear metrics - toRemove.terminate(terminateMessage, null, false); + toRemove.terminate(terminateMessage, null, true); } + src = createSource(peerId, peer); + this.sources.put(peerId, src); for (NavigableSet walsByGroup : walsById.get(peerId).values()) { walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); }