Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,14 @@ public void incrLogReadInBytes(long readInBytes) {

/** Removes all metrics about this Source. */
public void clear() {
terminate();
singleSourceSource.clear();
}

public void terminate() {
int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.clear();
globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
lastShippedTimeStamps.clear();
lastHFileRefsQueueSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,10 +706,13 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool
}
}
}
if (clearMetrics) {
// Can be null in test context.
if (this.metrics != null) {

// Can be null in test context.
if (this.metrics != null) {
if (clearMetrics) {
this.metrics.clear();
} else {
this.metrics.terminate();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,8 @@ public void refreshSources(String peerId) throws IOException {
ReplicationSourceInterface toRemove = this.sources.remove(peerId);
if (toRemove != null) {
LOG.info("Terminate replication source for " + toRemove.getPeerId());
toRemove.terminate(terminateMessage, null, true);
// Do not clear metrics
toRemove.terminate(terminateMessage, null, false);
}
src = createSource(peerId, peer);
this.sources.put(peerId, src);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public void terminate(String reason, Exception e) {
public void terminate(String reason, Exception e, boolean clearMetrics) {
if (clearMetrics) {
this.metrics.clear();
} else {
this.metrics.terminate();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,41 @@ public void testRemovePeerMetricsCleanup() throws Exception {
}
}

@Test
public void testDisablePeerMetricsCleanup() throws Exception {
final String peerId = "DummyPeer";
final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
try {
MetricsReplicationSourceSource globalSource = getGlobalSource();
final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
final long sizeOfLatestPath = getSizeOfLatestPath();
addPeerAndWait(peerId, peerConfig, true);
assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
ReplicationSourceInterface source = manager.getSource(peerId);
// Sanity check
assertNotNull(source);
final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
// Enqueue log and check if metrics updated
source.enqueueLog(new Path("abc"));
assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());

// Refreshing the peer should decrement the global and single source metrics
manager.refreshSources(peerId);
assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());

source = manager.getSource(peerId);
assertNotNull(source);
assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());
} finally {
removePeerAndWait(peerId);
}
}

private ReplicationSourceInterface mockReplicationSource(String peerId) {
ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
when(source.getPeerId()).thenReturn(peerId);
Expand Down