Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,30 +91,6 @@
* <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
* is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
* operations.</li>
* <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
* {@link #addPeer(String)}, {@link #removePeer(String)},
* {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and {@link #preLogRoll(Path)}.
* {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
* {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
* {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)}
* is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
* {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
* remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
* case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
* {@link #preLogRoll(Path)}.</li>
* <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
* modify it, {@link #removePeer(String)} ,
* {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
* {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
* {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by
* {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
* {@link ReplicationSourceInterface} firstly, then remove the wals from
* {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
* will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a
* {@link ReplicationSourceInterface}. So there is no race here. For
* {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
* is already synchronized on {@link #oldsources}. So no need synchronized on
* {@link #walsByIdRecoveredQueues}.</li>
* <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
* <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
* to-be-removed peer.</li>
Expand All @@ -135,15 +111,6 @@ public class ReplicationSourceManager implements ReplicationListener {
// All about stopping
private final Server server;

// All logs we are currently tracking
// Index structure of the map is: queue_id->logPrefix/logGroup->logs
// For normal replication source, the peer id is same with the queue id
private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById;
// Logs for recovered sources we are currently tracking
// the map is: queue_id->logPrefix/logGroup->logs
// For recovered source, the queue id's format is peer_id-servername-*
private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;

private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager;

private final Configuration conf;
Expand Down Expand Up @@ -195,8 +162,6 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker;
this.server = server;
this.walsById = new ConcurrentHashMap<>();
this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
this.oldsources = new ArrayList<>();
this.conf = conf;
this.fs = fs;
Expand Down Expand Up @@ -331,7 +296,6 @@ public void removePeer(String peerId) {
// Delete queue from storage and memory and queue id is same with peer id for normal
// source
deleteQueue(peerId);
this.walsById.remove(peerId);
}
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
if (peerConfig.isSyncReplication()) {
Expand Down Expand Up @@ -372,15 +336,10 @@ ReplicationSourceInterface addSource(String peerId) throws IOException {
// synchronized on latestPaths to avoid missing the new log
synchronized (this.latestPaths) {
this.sources.put(peerId, src);
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
this.walsById.put(peerId, walsByGroup);
// Add the latest wal to that source's queue
if (!latestPaths.isEmpty()) {
for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
Path walPath = walPrefixAndPath.getValue();
NavigableSet<String> wals = new TreeSet<>();
wals.add(walPath.getName());
walsByGroup.put(walPrefixAndPath.getKey(), wals);
// Abort RS and throw exception to make add peer failed
abortAndThrowIOExceptionWhenFail(
() -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName()));
Expand Down Expand Up @@ -434,7 +393,10 @@ public void drainSources(String peerId) throws IOException, ReplicationException
// map from walsById since later we may fail to delete them from the replication queue
// storage, and when we retry next time, we can not know the wal files that need to be deleted
// from the replication queue storage.
walsById.get(peerId).forEach((k, v) -> wals.put(k, new TreeSet<>(v)));
this.queueStorage.getWALsInQueue(this.server.getServerName(), peerId).forEach(wal -> {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
wals.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
});
}
LOG.info("Startup replication source for " + src.getPeerId());
src.startup();
Expand All @@ -443,15 +405,6 @@ public void drainSources(String peerId) throws IOException, ReplicationException
queueStorage.removeWAL(server.getServerName(), peerId, wal);
}
}
synchronized (walsById) {
Map<String, NavigableSet<String>> oldWals = walsById.get(peerId);
wals.forEach((k, v) -> {
NavigableSet<String> walsByGroup = oldWals.get(k);
if (walsByGroup != null) {
walsByGroup.removeAll(v);
}
});
}
// synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is
// a background task, we will delete the file from replication queue storage under the lock to
// simplify the logic.
Expand All @@ -463,7 +416,6 @@ public void drainSources(String peerId) throws IOException, ReplicationException
oldSource.terminate(terminateMessage);
oldSource.getSourceMetrics().clear();
queueStorage.removeQueue(server.getServerName(), queueId);
walsByIdRecoveredQueues.remove(queueId);
iter.remove();
}
}
Expand All @@ -476,7 +428,7 @@ public void drainSources(String peerId) throws IOException, ReplicationException
* replication queue storage and only to enqueue all logs to the new replication source
* @param peerId the id of the replication peer
*/
public void refreshSources(String peerId) throws IOException {
public void refreshSources(String peerId) throws ReplicationException, IOException {
String terminateMessage = "Peer " + peerId +
" state or config changed. Will close the previous replication source and open a new one";
ReplicationPeer peer = replicationPeers.getPeer(peerId);
Expand All @@ -489,9 +441,8 @@ public void refreshSources(String peerId) throws IOException {
// Do not clear metrics
toRemove.terminate(terminateMessage, null, false);
}
for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
}
this.queueStorage.getWALsInQueue(this.server.getServerName(), peerId)
.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
}
LOG.info("Startup replication source for " + src.getPeerId());
src.startup();
Expand All @@ -512,9 +463,8 @@ public void refreshSources(String peerId) throws IOException {
for (String queueId : previousQueueIds) {
ReplicationSourceInterface replicationSource = createSource(queueId, peer);
this.oldsources.add(replicationSource);
for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal)));
}
this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)
.forEach(wal -> src.enqueueLog(new Path(wal)));
toStartup.add(replicationSource);
}
}
Expand All @@ -534,7 +484,6 @@ private boolean removeRecoveredSource(ReplicationSourceInterface src) {
LOG.info("Done with the recovered queue {}", src.getQueueId());
// Delete queue from storage and memory
deleteQueue(src.getQueueId());
this.walsByIdRecoveredQueues.remove(src.getQueueId());
return true;
}

Expand All @@ -557,8 +506,6 @@ void removeSource(ReplicationSourceInterface src) {
this.sources.remove(src.getPeerId());
// Delete queue from storage and memory
deleteQueue(src.getQueueId());
this.walsById.remove(src.getQueueId());

}

/**
Expand Down Expand Up @@ -644,42 +591,19 @@ public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
* @param source the replication source
*/
@VisibleForTesting
void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) {
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
if (source.isRecovered()) {
NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
if (wals != null) {
NavigableSet<String> walsToRemove = wals.headSet(log, inclusive);
if (walsToRemove.isEmpty()) {
return;
}
cleanOldLogs(walsToRemove, source);
walsToRemove.clear();
}
} else {
NavigableSet<String> wals;
NavigableSet<String> walsToRemove;
// synchronized on walsById to avoid race with preLogRoll
synchronized (this.walsById) {
wals = walsById.get(source.getQueueId()).get(logPrefix);
if (wals == null) {
return;
}
walsToRemove = wals.headSet(log, inclusive);
if (walsToRemove.isEmpty()) {
return;
}
walsToRemove = new TreeSet<>(walsToRemove);
}
// cleanOldLogs may spend some time, especially for sync replication where we may want to
// remove remote wals as the remote cluster may have already been down, so we do it outside
// the lock to avoid block preLogRoll
cleanOldLogs(walsToRemove, source);
// now let's remove the files in the set
synchronized (this.walsById) {
wals.removeAll(walsToRemove);
}
void cleanOldLogs(String log, boolean inclusive,
ReplicationSourceInterface source) {
NavigableSet<String> walsToRemove;
synchronized (this.latestPaths) {
walsToRemove = getWalsToRemove(source.getQueueId(), log, inclusive);
}
if (walsToRemove.isEmpty()) {
return;
}
// cleanOldLogs may spend some time, especially for sync replication where we may want to
// remove remote wals as the remote cluster may have already been down, so we do it outside
// the lock to avoid block preLogRoll
cleanOldLogs(walsToRemove, source);
}

private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
Expand Down Expand Up @@ -760,37 +684,6 @@ public void preLogRoll(Path newLog) throws IOException {
abortAndThrowIOExceptionWhenFail(
() -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName));
}

// synchronized on walsById to avoid race with cleanOldLogs
synchronized (this.walsById) {
// Update walsById map
for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById
.entrySet()) {
String peerId = entry.getKey();
Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
boolean existingPrefix = false;
for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
SortedSet<String> wals = walsEntry.getValue();
if (this.sources.isEmpty()) {
// If there's no slaves, don't need to keep the old wals since
// we only consider the last one when a new slave comes in
wals.clear();
}
if (logPrefix.equals(walsEntry.getKey())) {
wals.add(logName);
existingPrefix = true;
}
}
if (!existingPrefix) {
// The new log belongs to a new group, add it into this peer
LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
NavigableSet<String> wals = new TreeSet<>();
wals.add(logName);
walsByPrefix.put(logPrefix, wals);
}
}
}

// Add to latestPaths
latestPaths.put(logPrefix, newLog);
}
Expand Down Expand Up @@ -962,18 +855,6 @@ public void run() {
continue;
}
}
// track sources in walsByIdRecoveredQueues
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
walsByIdRecoveredQueues.put(queueId, walsByGroup);
for (String wal : walsSet) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
NavigableSet<String> wals = walsByGroup.get(walPrefix);
if (wals == null) {
wals = new TreeSet<>();
walsByGroup.put(walPrefix, wals);
}
wals.add(wal);
}
oldsources.add(src);
LOG.trace("Added source for recovered queue: " + src.getQueueId());
for (String wal : walsSet) {
Expand Down Expand Up @@ -1005,7 +886,18 @@ public void join() {
* @return a sorted set of wal names
*/
@VisibleForTesting
public Map<String, Map<String, NavigableSet<String>>> getWALs() {
public Map<String, Map<String, NavigableSet<String>>> getWALs()
throws ReplicationException {
Map<String, Map<String, NavigableSet<String>>> walsById = new HashMap<>();
for (ReplicationSourceInterface source : sources.values()) {
String queueId = source.getQueueId();
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
walsById.put(queueId, walsByGroup);
for (String wal : this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
walsByGroup.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
}
}
return Collections.unmodifiableMap(walsById);
}

Expand All @@ -1014,7 +906,18 @@ public Map<String, Map<String, NavigableSet<String>>> getWALs() {
* @return a sorted set of wal names
*/
@VisibleForTesting
Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues()
throws ReplicationException {
Map<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues = new HashMap<>();
for (ReplicationSourceInterface source : oldsources) {
String queueId = source.getQueueId();
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
walsByIdRecoveredQueues.put(queueId, walsByGroup);
for (String wal : this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
walsByGroup.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
}
}
return Collections.unmodifiableMap(walsByIdRecoveredQueues);
}

Expand Down Expand Up @@ -1177,4 +1080,21 @@ public void cleanUpHFileRefs(String peerId, List<String> files) {
int activeFailoverTaskCount() {
return executor.getActiveCount();
}

private NavigableSet<String> getWalsToRemove(String queueId, String log, boolean inclusive) {
NavigableSet<String> walsToRemove = new TreeSet<>();
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
try {
this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId).forEach(wal -> {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
if (walPrefix.equals(logPrefix)) {
walsToRemove.add(wal);
}
});
} catch (ReplicationException e) {
// Just log the exception here, as the recovered replication source will try to cleanup again.
LOG.warn("Failed to read wals in queue {}", queueId, e);
}
return walsToRemove.headSet(log, inclusive);
}
}