Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

/**
Expand All @@ -43,6 +43,8 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
private String methodName = "";
private Object [] params = {};
private Message packet;
private boolean snapshot = false;
private Map<String, Object> callInfoMap = new HashMap<>();

public MonitoredRPCHandlerImpl() {
super();
Expand All @@ -53,11 +55,14 @@ public MonitoredRPCHandlerImpl() {

@Override
public synchronized MonitoredRPCHandlerImpl clone() {
return (MonitoredRPCHandlerImpl) super.clone();
MonitoredRPCHandlerImpl clone = (MonitoredRPCHandlerImpl) super.clone();
clone.setCallInfoMap(generateCallInfoMap());
clone.setSnapshot(true);
return clone;
}

/**
* Gets the status of this handler; if it is currently servicing an RPC,
* Gets the status of this handler; if it is currently servicing an RPC,
* this status will include the RPC information.
* @return a String describing the current status.
*/
Expand Down Expand Up @@ -233,6 +238,18 @@ public synchronized void markComplete(String status) {

@Override
public synchronized Map<String, Object> toMap() {
return this.snapshot ? this.callInfoMap : generateCallInfoMap();
}

public void setSnapshot(boolean snapshot) {
this.snapshot = snapshot;
}

public void setCallInfoMap(Map<String, Object> callInfoMap) {
this.callInfoMap = callInfoMap;
}

private Map<String, Object> generateCallInfoMap() {
// only include RPC info if the Handler is actively servicing an RPC call
Map<String, Object> map = super.toMap();
if (getState() != State.RUNNING) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
*/
package org.apache.hadoop.hbase.monitoring;

import static org.junit.Assert.*;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand All @@ -34,9 +37,12 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({MiscTests.class, SmallTests.class})
public class TestTaskMonitor {
private static final Logger LOG = LoggerFactory.getLogger(TestTaskMonitor.class);

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
Expand Down Expand Up @@ -226,5 +232,66 @@ public void testStatusJournal() {
assertEquals("status3", task.getStatusJournal().get(1).getStatus());
tm.shutdown();
}

@Test
public void testClone() throws Exception {
MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl();
monitor.abort("abort RPC");
TestParam testParam = new TestParam("param1");
monitor.setRPC("method1", new Object[]{ testParam }, 0);
MonitoredRPCHandlerImpl clone = monitor.clone();
assertEquals(clone.getDescription(), monitor.getDescription());
assertEquals(clone.getState(), monitor.getState());
assertEquals(clone.getStatus(), monitor.getStatus());
assertEquals(clone.toString(), monitor.toString());
assertEquals(clone.toMap(), monitor.toMap());
assertEquals(clone.toJSON(), monitor.toJSON());

// mark complete and make param dirty
monitor.markComplete("complete RPC");
testParam.setParam("dirtyParam");
assertEquals(clone.getDescription(), monitor.getDescription());
assertNotEquals(clone.getState(), monitor.getState());
assertNotEquals(clone.getStatus(), monitor.getStatus());
monitor.setState(MonitoredTask.State.RUNNING);
try {
// when markComplete, the param in monitor is set null, so toMap should fail here
monitor.toMap();
fail("Should not call toMap successfully, because param=null");
} catch (Exception e) {
}
// the param of clone monitor should not be dirty
assertNotEquals("[dirtyString]",
String.valueOf(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params")));

monitor.resume("resume");
monitor.setRPC("method2", new Object[]{new TestParam("param2")}, 1);
assertNotEquals(((Map<String, Object>) clone.toMap().get("rpcCall")).get("params"),
((Map<String, Object>) monitor.toMap().get("rpcCall")).get(
"params"));
LOG.info(String.valueOf(clone.toMap()));
LOG.info(String.valueOf(monitor.toMap()));
assertNotEquals(clone.toString(), monitor.toString());
assertNotEquals(clone.getRPCQueueTime(), monitor.getRPCQueueTime());
assertNotEquals(clone.toMap(), monitor.toMap());
assertNotEquals(clone.toJSON(), monitor.toJSON());
}

private class TestParam {
public String param = null;

public TestParam(String param) {
this.param = param;
}

public void setParam(String param) {
this.param = param;
}

@Override
public String toString() {
return param;
}
}
}