Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public void close() {
// making any more calls after this point (eg clear the queue)
RPC.stopProxy(proxy);
}
metrics.unregister();
}

protected QJournalProtocol getProxy() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.qjournal.client;

import java.net.InetSocketAddress;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
Expand All @@ -29,8 +28,6 @@
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;

import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;

/**
* The metrics for a journal from the writer's perspective.
*/
Expand All @@ -43,21 +40,6 @@ class IPCLoggerChannelMetrics {
private final MutableQuantiles[] writeEndToEndLatencyQuantiles;
private final MutableQuantiles[] writeRpcLatencyQuantiles;


/**
* In the case of the NN transitioning between states, edit logs are closed
* and reopened. Thus, the IPCLoggerChannel instance that writes to a
* given JournalNode may change over the lifetime of the process.
* However, metrics2 doesn't have a function to unregister a set of metrics
* and fails if a new metrics class is registered with the same name
* as the existing one. Hence, we have to maintain our own registry
* ("multiton") here, so that we have exactly one metrics instance
* per JournalNode, and switch out the pointer to the underlying
* IPCLoggerChannel instance.
*/
private static final Map<String, IPCLoggerChannelMetrics> REGISTRY =
Maps.newHashMap();

private IPCLoggerChannelMetrics(IPCLoggerChannel ch) {
this.ch = ch;

Expand All @@ -81,25 +63,16 @@ private IPCLoggerChannelMetrics(IPCLoggerChannel ch) {
writeRpcLatencyQuantiles = null;
}
}

private void setChannel(IPCLoggerChannel ch) {
assert ch.getRemoteAddress().equals(this.ch.getRemoteAddress());
this.ch = ch;

public void unregister() {
DefaultMetricsSystem.instance().unregisterSource(getName(ch));
}

static IPCLoggerChannelMetrics create(IPCLoggerChannel ch) {
String name = getName(ch);
synchronized (REGISTRY) {
IPCLoggerChannelMetrics m = REGISTRY.get(name);
if (m != null) {
m.setChannel(ch);
} else {
m = new IPCLoggerChannelMetrics(ch);
DefaultMetricsSystem.instance().register(name, null, m);
REGISTRY.put(name, m);
}
return m;
}
IPCLoggerChannelMetrics m = new IPCLoggerChannelMetrics(ch);
DefaultMetricsSystem.instance().register(name, null, m);
return m;
}

private static String getName(IPCLoggerChannel ch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
import org.apache.hadoop.hdfs.qjournal.client.LoggerTooFarBehindException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
Expand Down Expand Up @@ -178,4 +179,20 @@ public void testStopSendingEditsWhenOutOfSync() throws Exception {

ch.sendEdits(3L, 3L, 1, FAKE_DATA).get();
}

@Test
public void testMetricsRemovedOnClose() {
MetricsSystem metricsSystem = DefaultMetricsSystem.instance();
String sourceName = "IPCLoggerChannel-"
+ FAKE_ADDR.getAddress().getHostAddress()
+ "-" + FAKE_ADDR.getPort();
// Ensure the metrics exist
MetricsSource source = metricsSystem.getSource(sourceName);
assertNotNull(source);

ch.close();
// ensure the metrics are removed.
source = metricsSystem.getSource(sourceName);
assertNull(source);
}
}