diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
index e5541a7944b3..aac6bb73dd3b 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
@@ -55,7 +55,7 @@ org.apache.hadoop.hbase.quotas.QuotaUtil;
org.apache.hadoop.hbase.security.access.PermissionStorage;
org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
-org.apache.hadoop.hbase.tool.Canary;
+org.apache.hadoop.hbase.tool.CanaryTool;
org.apache.hadoop.hbase.util.Bytes;
org.apache.hadoop.hbase.util.FSUtils;
org.apache.hadoop.hbase.util.JvmVersion;
@@ -513,7 +513,7 @@ AssignmentManager assignmentManager = master.getAssignmentManager();
<%java>String description = null;
if (tableName.equals(TableName.META_TABLE_NAME)){
description = "The hbase:meta table holds references to all User Table regions.";
- } else if (tableName.equals(Canary.DEFAULT_WRITE_TABLE_NAME)){
+ } else if (tableName.equals(CanaryTool.DEFAULT_WRITE_TABLE_NAME)){
description = "The hbase:canary table is used to sniff the write availbility of"
+ " each regionserver.";
} else if (tableName.equals(PermissionStorage.ACL_TABLE_NAME)){
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
index 3cff2b7ef4f8..b49d2cebede1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
@@ -19,1765 +19,49 @@
package org.apache.hadoop.hbase.tool;
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
-import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.lang3.time.StopWatch;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.AuthUtil;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.ClusterMetrics;
-import org.apache.hadoop.hbase.ClusterMetrics.Option;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotEnabledException;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.hbase.util.RegionSplitter;
-import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.client.ConnectStringParser;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
-/**
- * HBase Canary Tool for "canary monitoring" of a running HBase cluster.
- *
- * There are three modes:
- *
- * - region mode (Default): For each region, try to get one row per column family outputting
- * information on failure (ERROR) or else the latency.
- *
- *
- * - regionserver mode: For each regionserver try to get one row from one table selected
- * randomly outputting information on failure (ERROR) or else the latency.
- *
- *
- * - zookeeper mode: for each zookeeper instance, selects a znode outputting information on
- * failure (ERROR) or else the latency.
- *
- *
- */
-@InterfaceAudience.Private
-public final class Canary implements Tool {
- /**
- * Sink interface used by the canary to output information
- */
- public interface Sink {
- long getReadFailureCount();
- long incReadFailureCount();
- Map getReadFailures();
- void updateReadFailures(String regionName, String serverName);
- long getWriteFailureCount();
- long incWriteFailureCount();
- Map getWriteFailures();
- void updateWriteFailures(String regionName, String serverName);
- long getReadSuccessCount();
- long incReadSuccessCount();
- long getWriteSuccessCount();
- long incWriteSuccessCount();
- }
-
- /**
- * Simple implementation of canary sink that allows plotting to a file or standard output.
- */
- public static class StdOutSink implements Sink {
- private AtomicLong readFailureCount = new AtomicLong(0),
- writeFailureCount = new AtomicLong(0),
- readSuccessCount = new AtomicLong(0),
- writeSuccessCount = new AtomicLong(0);
- private Map readFailures = new ConcurrentHashMap<>();
- private Map writeFailures = new ConcurrentHashMap<>();
-
- @Override
- public long getReadFailureCount() {
- return readFailureCount.get();
- }
-
- @Override
- public long incReadFailureCount() {
- return readFailureCount.incrementAndGet();
- }
-
- @Override
- public Map getReadFailures() {
- return readFailures;
- }
-
- @Override
- public void updateReadFailures(String regionName, String serverName) {
- readFailures.put(regionName, serverName);
- }
-
- @Override
- public long getWriteFailureCount() {
- return writeFailureCount.get();
- }
-
- @Override
- public long incWriteFailureCount() {
- return writeFailureCount.incrementAndGet();
- }
-
- @Override
- public Map getWriteFailures() {
- return writeFailures;
- }
-
- @Override
- public void updateWriteFailures(String regionName, String serverName) {
- writeFailures.put(regionName, serverName);
- }
-
- @Override
- public long getReadSuccessCount() {
- return readSuccessCount.get();
- }
-
- @Override
- public long incReadSuccessCount() {
- return readSuccessCount.incrementAndGet();
- }
-
- @Override
- public long getWriteSuccessCount() {
- return writeSuccessCount.get();
- }
-
- @Override
- public long incWriteSuccessCount() {
- return writeSuccessCount.incrementAndGet();
- }
- }
-
- /**
- * By RegionServer, for 'regionserver' mode.
- */
- public static class RegionServerStdOutSink extends StdOutSink {
- public void publishReadFailure(String table, String server) {
- incReadFailureCount();
- LOG.error("Read from {} on {}", table, server);
- }
-
- public void publishReadTiming(String table, String server, long msTime) {
- LOG.info("Read from {} on {} in {}ms", table, server, msTime);
- }
- }
-
- /**
- * Output for 'zookeeper' mode.
- */
- public static class ZookeeperStdOutSink extends StdOutSink {
- public void publishReadFailure(String znode, String server) {
- incReadFailureCount();
- LOG.error("Read from {} on {}", znode, server);
- }
-
- public void publishReadTiming(String znode, String server, long msTime) {
- LOG.info("Read from {} on {} in {}ms", znode, server, msTime);
- }
- }
-
- /**
- * By Region, for 'region' mode.
- */
- public static class RegionStdOutSink extends StdOutSink {
- private Map perTableReadLatency = new HashMap<>();
- private LongAdder writeLatency = new LongAdder();
- private final Map regionMap = new ConcurrentHashMap<>();
-
- public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) {
- incReadFailureCount();
- LOG.error("Read from {} on {} failed", region.getRegionNameAsString(), serverName, e);
- }
-
- public void publishReadFailure(ServerName serverName, RegionInfo region,
- ColumnFamilyDescriptor column, Exception e) {
- incReadFailureCount();
- LOG.error("Read from {} on {} {} failed", region.getRegionNameAsString(), serverName,
- column.getNameAsString(), e);
- }
-
- public void publishReadTiming(ServerName serverName, RegionInfo region,
- ColumnFamilyDescriptor column, long msTime) {
- incReadSuccessCount();
- RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString());
- res.setReadSuccess();
- res.setReadLatency(msTime);
- LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
- column.getNameAsString(), msTime);
- }
-
- public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) {
- incWriteFailureCount();
- LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e);
- }
-
- public void publishWriteFailure(ServerName serverName, RegionInfo region,
- ColumnFamilyDescriptor column, Exception e) {
- incWriteFailureCount();
- LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName,
- column.getNameAsString(), e);
- }
-
- public void publishWriteTiming(ServerName serverName, RegionInfo region,
- ColumnFamilyDescriptor column, long msTime) {
- incWriteSuccessCount();
- RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString());
- res.setWriteSuccess();
- res.setWriteLatency(msTime);
- LOG.info("Write to {} on {} {} in {}ms",
- region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime);
- }
-
- public Map getReadLatencyMap() {
- return this.perTableReadLatency;
- }
-
- public LongAdder initializeAndGetReadLatencyForTable(String tableName) {
- LongAdder initLatency = new LongAdder();
- this.perTableReadLatency.put(tableName, initLatency);
- return initLatency;
- }
-
- public void initializeWriteLatency() {
- this.writeLatency.reset();
- }
-
- public LongAdder getWriteLatency() {
- return this.writeLatency;
- }
-
- public Map getRegionMap() {
- return this.regionMap;
- }
-
- public int getTotalExpectedRegions() {
- return this.regionMap.size();
- }
- }
-
- /**
- * Run a single zookeeper Task and then exit.
- */
- static class ZookeeperTask implements Callable {
- private final Connection connection;
- private final String host;
- private String znode;
- private final int timeout;
- private ZookeeperStdOutSink sink;
-
- public ZookeeperTask(Connection connection, String host, String znode, int timeout,
- ZookeeperStdOutSink sink) {
- this.connection = connection;
- this.host = host;
- this.znode = znode;
- this.timeout = timeout;
- this.sink = sink;
- }
-
- @Override public Void call() throws Exception {
- ZooKeeper zooKeeper = null;
- try {
- zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
- Stat exists = zooKeeper.exists(znode, false);
- StopWatch stopwatch = new StopWatch();
- stopwatch.start();
- zooKeeper.getData(znode, false, exists);
- stopwatch.stop();
- sink.publishReadTiming(znode, host, stopwatch.getTime());
- } catch (KeeperException | InterruptedException e) {
- sink.publishReadFailure(znode, host);
- } finally {
- if (zooKeeper != null) {
- zooKeeper.close();
- }
- }
- return null;
- }
- }
-
- /**
- * Run a single Region Task and then exit. For each column family of the Region, get one row and
- * output latency or failure.
- */
- static class RegionTask implements Callable {
- public enum TaskType{
- READ, WRITE
- }
- private Connection connection;
- private RegionInfo region;
- private RegionStdOutSink sink;
- private TaskType taskType;
- private boolean rawScanEnabled;
- private ServerName serverName;
- private LongAdder readWriteLatency;
-
- RegionTask(Connection connection, RegionInfo region, ServerName serverName,
- RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency) {
- this.connection = connection;
- this.region = region;
- this.serverName = serverName;
- this.sink = sink;
- this.taskType = taskType;
- this.rawScanEnabled = rawScanEnabled;
- this.readWriteLatency = rwLatency;
- }
-
- @Override
- public Void call() {
- switch (taskType) {
- case READ:
- return read();
- case WRITE:
- return write();
- default:
- return read();
- }
- }
-
- public Void read() {
- Table table = null;
- TableDescriptor tableDesc = null;
- try {
- LOG.debug("Reading table descriptor for table {}", region.getTable());
- table = connection.getTable(region.getTable());
- tableDesc = table.getDescriptor();
- } catch (IOException e) {
- LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e);
- sink.publishReadFailure(serverName, region, e);
- if (table != null) {
- try {
- table.close();
- } catch (IOException ioe) {
- LOG.error("Close table failed", e);
- }
- }
- return null;
- }
-
- byte[] startKey = null;
- Get get = null;
- Scan scan = null;
- ResultScanner rs = null;
- StopWatch stopWatch = new StopWatch();
- for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
- stopWatch.reset();
- startKey = region.getStartKey();
- // Can't do a get on empty start row so do a Scan of first element if any instead.
- if (startKey.length > 0) {
- get = new Get(startKey);
- get.setCacheBlocks(false);
- get.setFilter(new FirstKeyOnlyFilter());
- get.addFamily(column.getName());
- } else {
- scan = new Scan();
- LOG.debug("rawScan {} for {}", rawScanEnabled, tableDesc.getTableName());
- scan.setRaw(rawScanEnabled);
- scan.setCaching(1);
- scan.setCacheBlocks(false);
- scan.setFilter(new FirstKeyOnlyFilter());
- scan.addFamily(column.getName());
- scan.setMaxResultSize(1L);
- scan.setOneRowLimit();
- }
- LOG.debug("Reading from {} {} {} {}", tableDesc.getTableName(),
- region.getRegionNameAsString(), column.getNameAsString(),
- Bytes.toStringBinary(startKey));
- try {
- stopWatch.start();
- if (startKey.length > 0) {
- table.get(get);
- } else {
- rs = table.getScanner(scan);
- rs.next();
- }
- stopWatch.stop();
- this.readWriteLatency.add(stopWatch.getTime());
- sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
- } catch (Exception e) {
- sink.publishReadFailure(serverName, region, column, e);
- sink.updateReadFailures(region.getRegionNameAsString(), serverName.getHostname());
- } finally {
- if (rs != null) {
- rs.close();
- }
- scan = null;
- get = null;
- }
- }
- try {
- table.close();
- } catch (IOException e) {
- LOG.error("Close table failed", e);
- }
- return null;
- }
-
- /**
- * Check writes for the canary table
- */
- private Void write() {
- Table table = null;
- TableDescriptor tableDesc = null;
- try {
- table = connection.getTable(region.getTable());
- tableDesc = table.getDescriptor();
- byte[] rowToCheck = region.getStartKey();
- if (rowToCheck.length == 0) {
- rowToCheck = new byte[]{0x0};
- }
- int writeValueSize =
- connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
- for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
- Put put = new Put(rowToCheck);
- byte[] value = new byte[writeValueSize];
- Bytes.random(value);
- put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
-
- LOG.debug("Writing to {} {} {} {}",
- tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
- Bytes.toStringBinary(rowToCheck));
- try {
- long startTime = System.currentTimeMillis();
- table.put(put);
- long time = System.currentTimeMillis() - startTime;
- this.readWriteLatency.add(time);
- sink.publishWriteTiming(serverName, region, column, time);
- } catch (Exception e) {
- sink.publishWriteFailure(serverName, region, column, e);
- }
- }
- table.close();
- } catch (IOException e) {
- sink.publishWriteFailure(serverName, region, e);
- sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname() );
- }
- return null;
- }
- }
-
- /**
- * Run a single RegionServer Task and then exit.
- * Get one row from a region on the regionserver and output latency or the failure.
- */
- static class RegionServerTask implements Callable {
- private Connection connection;
- private String serverName;
- private RegionInfo region;
- private RegionServerStdOutSink sink;
- private AtomicLong successes;
-
- RegionServerTask(Connection connection, String serverName, RegionInfo region,
- RegionServerStdOutSink sink, AtomicLong successes) {
- this.connection = connection;
- this.serverName = serverName;
- this.region = region;
- this.sink = sink;
- this.successes = successes;
- }
-
- @Override
- public Void call() {
- TableName tableName = null;
- Table table = null;
- Get get = null;
- byte[] startKey = null;
- Scan scan = null;
- StopWatch stopWatch = new StopWatch();
- // monitor one region on every region server
- stopWatch.reset();
- try {
- tableName = region.getTable();
- table = connection.getTable(tableName);
- startKey = region.getStartKey();
- // Can't do a get on empty start row so do a Scan of first element if any instead.
- LOG.debug("Reading from {} {} {} {}",
- serverName, region.getTable(), region.getRegionNameAsString(),
- Bytes.toStringBinary(startKey));
- if (startKey.length > 0) {
- get = new Get(startKey);
- get.setCacheBlocks(false);
- get.setFilter(new FirstKeyOnlyFilter());
- stopWatch.start();
- table.get(get);
- stopWatch.stop();
- } else {
- scan = new Scan();
- scan.setCacheBlocks(false);
- scan.setFilter(new FirstKeyOnlyFilter());
- scan.setCaching(1);
- scan.setMaxResultSize(1L);
- scan.setOneRowLimit();
- stopWatch.start();
- ResultScanner s = table.getScanner(scan);
- s.next();
- s.close();
- stopWatch.stop();
- }
- successes.incrementAndGet();
- sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
- } catch (TableNotFoundException tnfe) {
- LOG.error("Table may be deleted", tnfe);
- // This is ignored because it doesn't imply that the regionserver is dead
- } catch (TableNotEnabledException tnee) {
- // This is considered a success since we got a response.
- successes.incrementAndGet();
- LOG.debug("The targeted table was disabled. Assuming success.");
- } catch (DoNotRetryIOException dnrioe) {
- sink.publishReadFailure(tableName.getNameAsString(), serverName);
- LOG.error(dnrioe.toString(), dnrioe);
- } catch (IOException e) {
- sink.publishReadFailure(tableName.getNameAsString(), serverName);
- LOG.error(e.toString(), e);
- } finally {
- if (table != null) {
- try {
- table.close();
- } catch (IOException e) {/* DO NOTHING */
- LOG.error("Close table failed", e);
- }
- }
- scan = null;
- get = null;
- startKey = null;
- }
- return null;
- }
- }
-
- private static final int USAGE_EXIT_CODE = 1;
- private static final int INIT_ERROR_EXIT_CODE = 2;
- private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
- private static final int ERROR_EXIT_CODE = 4;
- private static final int FAILURE_EXIT_CODE = 5;
-
- private static final long DEFAULT_INTERVAL = 60000;
-
- private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
- private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
-
- private static final Logger LOG = LoggerFactory.getLogger(Canary.class);
-
- public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
- NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
-
- private static final String CANARY_TABLE_FAMILY_NAME = "Test";
-
- private Configuration conf = null;
- private long interval = 0;
- private Sink sink = null;
-
- private boolean useRegExp;
- private long timeout = DEFAULT_TIMEOUT;
- private boolean failOnError = true;
-
- /**
- * True if we are to run in 'regionServer' mode.
- */
- private boolean regionServerMode = false;
-
- /**
- * True if we are to run in zookeeper 'mode'.
- */
- private boolean zookeeperMode = false;
-
- private long permittedFailures = 0;
- private boolean regionServerAllRegions = false;
- private boolean writeSniffing = false;
- private long configuredWriteTableTimeout = DEFAULT_TIMEOUT;
- private boolean treatFailureAsError = false;
- private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
-
- /**
- * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e.
- * we aggregate time to fetch each region and it needs to be less than this value else we
- * log an ERROR.
- */
- private HashMap configuredReadTableTimeouts = new HashMap<>();
+import org.apache.yetus.audience.InterfaceAudience;
- private ExecutorService executor; // threads to retrieve data from regionservers
+@InterfaceAudience.Public
+public interface Canary {
- public Canary() {
- this(new ScheduledThreadPoolExecutor(1));
- }
-
- public Canary(ExecutorService executor) {
- this(executor, null);
+ static Canary create(Configuration conf, ExecutorService executor) {
+ return new CanaryTool(conf, executor);
}
@VisibleForTesting
- Canary(ExecutorService executor, Sink sink) {
- this.executor = executor;
- this.sink = sink;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- private int parseArgs(String[] args) {
- int index = -1;
- // Process command line args
- for (int i = 0; i < args.length; i++) {
- String cmd = args[i];
-
- if (cmd.startsWith("-")) {
- if (index >= 0) {
- // command line args must be in the form: [opts] [table 1 [table 2 ...]]
- System.err.println("Invalid command line options");
- printUsageAndExit();
- }
-
- if (cmd.equals("-help") || cmd.equals("-h")) {
- // user asked for help, print the help and quit.
- printUsageAndExit();
- } else if (cmd.equals("-daemon") && interval == 0) {
- // user asked for daemon mode, set a default interval between checks
- interval = DEFAULT_INTERVAL;
- } else if (cmd.equals("-interval")) {
- // user has specified an interval for canary breaths (-interval N)
- i++;
-
- if (i == args.length) {
- System.err.println("-interval takes a numeric seconds value argument.");
- printUsageAndExit();
- }
-
- try {
- interval = Long.parseLong(args[i]) * 1000;
- } catch (NumberFormatException e) {
- System.err.println("-interval needs a numeric value argument.");
- printUsageAndExit();
- }
- } else if (cmd.equals("-zookeeper")) {
- this.zookeeperMode = true;
- } else if(cmd.equals("-regionserver")) {
- this.regionServerMode = true;
- } else if(cmd.equals("-allRegions")) {
- this.regionServerAllRegions = true;
- } else if(cmd.equals("-writeSniffing")) {
- this.writeSniffing = true;
- } else if(cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) {
- this.treatFailureAsError = true;
- } else if (cmd.equals("-e")) {
- this.useRegExp = true;
- } else if (cmd.equals("-t")) {
- i++;
-
- if (i == args.length) {
- System.err.println("-t takes a numeric milliseconds value argument.");
- printUsageAndExit();
- }
-
- try {
- this.timeout = Long.parseLong(args[i]);
- } catch (NumberFormatException e) {
- System.err.println("-t takes a numeric milliseconds value argument.");
- printUsageAndExit();
- }
- } else if(cmd.equals("-writeTableTimeout")) {
- i++;
-
- if (i == args.length) {
- System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
- printUsageAndExit();
- }
-
- try {
- this.configuredWriteTableTimeout = Long.parseLong(args[i]);
- } catch (NumberFormatException e) {
- System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
- printUsageAndExit();
- }
- } else if (cmd.equals("-writeTable")) {
- i++;
-
- if (i == args.length) {
- System.err.println("-writeTable takes a string tablename value argument.");
- printUsageAndExit();
- }
- this.writeTableName = TableName.valueOf(args[i]);
- } else if (cmd.equals("-f")) {
- i++;
-
- if (i == args.length) {
- System.err
- .println("-f needs a boolean value argument (true|false).");
- printUsageAndExit();
- }
-
- this.failOnError = Boolean.parseBoolean(args[i]);
- } else if (cmd.equals("-readTableTimeouts")) {
- i++;
-
- if (i == args.length) {
- System.err.println("-readTableTimeouts needs a comma-separated list of read " +
- "millisecond timeouts per table (without spaces).");
- printUsageAndExit();
- }
- String [] tableTimeouts = args[i].split(",");
- for (String tT: tableTimeouts) {
- String [] nameTimeout = tT.split("=");
- if (nameTimeout.length < 2) {
- System.err.println("Each -readTableTimeouts argument must be of the form " +
- "= (without spaces).");
- printUsageAndExit();
- }
- long timeoutVal = 0L;
- try {
- timeoutVal = Long.parseLong(nameTimeout[1]);
- } catch (NumberFormatException e) {
- System.err.println("-readTableTimeouts read timeout for each table must be a numeric value argument.");
- printUsageAndExit();
- }
- this.configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal);
- }
- } else if (cmd.equals("-permittedZookeeperFailures")) {
- i++;
-
- if (i == args.length) {
- System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
- printUsageAndExit();
- }
- try {
- this.permittedFailures = Long.parseLong(args[i]);
- } catch (NumberFormatException e) {
- System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
- printUsageAndExit();
- }
- } else {
- // no options match
- System.err.println(cmd + " options is invalid.");
- printUsageAndExit();
- }
- } else if (index < 0) {
- // keep track of first table name specified by the user
- index = i;
- }
- }
- if (this.regionServerAllRegions && !this.regionServerMode) {
- System.err.println("-allRegions can only be specified in regionserver mode.");
- printUsageAndExit();
- }
- if (this.zookeeperMode) {
- if (this.regionServerMode || this.regionServerAllRegions || this.writeSniffing) {
- System.err.println("-zookeeper is exclusive and cannot be combined with "
- + "other modes.");
- printUsageAndExit();
- }
- }
- if (this.permittedFailures != 0 && !this.zookeeperMode) {
- System.err.println("-permittedZookeeperFailures requires -zookeeper mode.");
- printUsageAndExit();
- }
- if (!this.configuredReadTableTimeouts.isEmpty() && (this.regionServerMode || this.zookeeperMode)) {
- System.err.println("-readTableTimeouts can only be configured in region mode.");
- printUsageAndExit();
- }
- return index;
- }
-
- @Override
- public int run(String[] args) throws Exception {
- int index = parseArgs(args);
- ChoreService choreService = null;
-
- // Launches chore for refreshing kerberos credentials if security is enabled.
- // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
- // for more details.
- final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
- if (authChore != null) {
- choreService = new ChoreService("CANARY_TOOL");
- choreService.scheduleChore(authChore);
- }
-
- // Start to prepare the stuffs
- Monitor monitor = null;
- Thread monitorThread = null;
- long startTime = 0;
- long currentTimeLength = 0;
- // Get a connection to use in below.
- try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
- do {
- // Do monitor !!
- try {
- monitor = this.newMonitor(connection, index, args);
- monitorThread = new Thread(monitor, "CanaryMonitor-" + System.currentTimeMillis());
- startTime = System.currentTimeMillis();
- monitorThread.start();
- while (!monitor.isDone()) {
- // wait for 1 sec
- Thread.sleep(1000);
- // exit if any error occurs
- if (this.failOnError && monitor.hasError()) {
- monitorThread.interrupt();
- if (monitor.initialized) {
- return monitor.errorCode;
- } else {
- return INIT_ERROR_EXIT_CODE;
- }
- }
- currentTimeLength = System.currentTimeMillis() - startTime;
- if (currentTimeLength > this.timeout) {
- LOG.error("The monitor is running too long (" + currentTimeLength
- + ") after timeout limit:" + this.timeout
- + " will be killed itself !!");
- if (monitor.initialized) {
- return TIMEOUT_ERROR_EXIT_CODE;
- } else {
- return INIT_ERROR_EXIT_CODE;
- }
- }
- }
-
- if (this.failOnError && monitor.finalCheckForErrors()) {
- monitorThread.interrupt();
- return monitor.errorCode;
- }
- } finally {
- if (monitor != null) monitor.close();
- }
-
- Thread.sleep(interval);
- } while (interval > 0);
- } // try-with-resources close
-
- if (choreService != null) {
- choreService.shutdown();
- }
- return monitor.errorCode;
- }
-
- public Map getReadFailures() {
- return sink.getReadFailures();
- }
-
- public Map getWriteFailures() {
- return sink.getWriteFailures();
- }
-
- private void printUsageAndExit() {
- System.err.println(
- "Usage: canary [OPTIONS] [ [ [ interval between checks in seconds");
- System.err.println(" -e consider table/regionserver argument as regular " +
- "expression");
- System.err.println(" -f exit on first error; default=true");
- System.err.println(" -failureAsError treat read/write failure as error");
- System.err.println(" -t timeout for canary-test run; default=600000ms");
- System.err.println(" -writeSniffing enable write sniffing");
- System.err.println(" -writeTable the table used for write sniffing; default=hbase:canary");
- System.err.println(" -writeTableTimeout timeout for writeTable; default=600000ms");
- System.err.println(" -readTableTimeouts =," +
- "=,...");
- System.err.println(" comma-separated list of table read timeouts " +
- "(no spaces);");
- System.err.println(" logs 'ERROR' if takes longer. default=600000ms");
- System.err.println(" -permittedZookeeperFailures Ignore first N failures attempting to ");
- System.err.println(" connect to individual zookeeper nodes in ensemble");
- System.err.println("");
- System.err.println(" -D= to assign or override configuration params");
- System.err.println(" -Dhbase.canary.read.raw.enabled= Set to enable/disable " +
- "raw scan; default=false");
- System.err.println("");
- System.err.println("Canary runs in one of three modes: region (default), regionserver, or " +
- "zookeeper.");
- System.err.println("To sniff/probe all regions, pass no arguments.");
- System.err.println("To sniff/probe all regions of a table, pass tablename.");
- System.err.println("To sniff/probe regionservers, pass -regionserver, etc.");
- System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation.");
- System.exit(USAGE_EXIT_CODE);
- }
-
- Sink getSink(Configuration configuration, Class clazz) {
- // In test context, this.sink might be set. Use it if non-null. For testing.
- return this.sink != null? this.sink:
- (Sink)ReflectionUtils.newInstance(configuration.getClass("hbase.canary.sink.class",
- clazz, Sink.class));
- }
-
- /**
- * Canary region mode-specific data structure which stores information about each region
- * to be scanned
- */
- public static class RegionTaskResult {
- private RegionInfo region;
- private TableName tableName;
- private ServerName serverName;
- private AtomicLong readLatency = null;
- private AtomicLong writeLatency = null;
- private boolean readSuccess = false;
- private boolean writeSuccess = false;
-
- public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName) {
- this.region = region;
- this.tableName = tableName;
- this.serverName = serverName;
- }
-
- public RegionInfo getRegionInfo() {
- return this.region;
- }
-
- public String getRegionNameAsString() {
- return this.region.getRegionNameAsString();
- }
-
- public TableName getTableName() {
- return this.tableName;
- }
-
- public String getTableNameAsString() {
- return this.tableName.getNameAsString();
- }
-
- public ServerName getServerName() {
- return this.serverName;
- }
-
- public String getServerNameAsString() {
- return this.serverName.getServerName();
- }
-
- public long getReadLatency() {
- if (this.readLatency == null) {
- return -1;
- }
- return this.readLatency.get();
- }
-
- public void setReadLatency(long readLatency) {
- if (this.readLatency != null) {
- this.readLatency.set(readLatency);
- } else {
- this.readLatency = new AtomicLong(readLatency);
- }
- }
-
- public long getWriteLatency() {
- if (this.writeLatency == null) {
- return -1;
- }
- return this.writeLatency.get();
- }
-
- public void setWriteLatency(long writeLatency) {
- if (this.writeLatency != null) {
- this.writeLatency.set(writeLatency);
- } else {
- this.writeLatency = new AtomicLong(writeLatency);
- }
- }
-
- public boolean isReadSuccess() {
- return this.readSuccess;
- }
-
- public void setReadSuccess() {
- this.readSuccess = true;
- }
-
- public boolean isWriteSuccess() {
- return this.writeSuccess;
- }
-
- public void setWriteSuccess() {
- this.writeSuccess = true;
- }
- }
-
- /**
- * A Factory method for {@link Monitor}.
- * Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a RegionMonitor.
- * @param index a start index for monitor target
- * @param args args passed from user
- * @return a Monitor instance
- */
- public Monitor newMonitor(final Connection connection, int index, String[] args) {
- Monitor monitor = null;
- String[] monitorTargets = null;
-
- if (index >= 0) {
- int length = args.length - index;
- monitorTargets = new String[length];
- System.arraycopy(args, index, monitorTargets, 0, length);
- }
-
- if (this.regionServerMode) {
- monitor =
- new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
- getSink(connection.getConfiguration(), RegionServerStdOutSink.class),
- this.executor, this.regionServerAllRegions,
- this.treatFailureAsError, this.permittedFailures);
- } else if (this.zookeeperMode) {
- monitor =
- new ZookeeperMonitor(connection, monitorTargets, this.useRegExp,
- getSink(connection.getConfiguration(), ZookeeperStdOutSink.class),
- this.executor, this.treatFailureAsError,
- this.permittedFailures);
- } else {
- monitor =
- new RegionMonitor(connection, monitorTargets, this.useRegExp,
- getSink(connection.getConfiguration(), RegionStdOutSink.class),
- this.executor, this.writeSniffing,
- this.writeTableName, this.treatFailureAsError, this.configuredReadTableTimeouts,
- this.configuredWriteTableTimeout, this.permittedFailures);
- }
- return monitor;
- }
-
- /**
- * A Monitor super-class can be extended by users
- */
- public static abstract class Monitor implements Runnable, Closeable {
- protected Connection connection;
- protected Admin admin;
- /**
- * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes.
- * Passed on the command-line as arguments.
- */
- protected String[] targets;
- protected boolean useRegExp;
- protected boolean treatFailureAsError;
- protected boolean initialized = false;
-
- protected boolean done = false;
- protected int errorCode = 0;
- protected long allowedFailures = 0;
- protected Sink sink;
- protected ExecutorService executor;
-
- public boolean isDone() {
- return done;
- }
-
- public boolean hasError() {
- return errorCode != 0;
- }
-
- public boolean finalCheckForErrors() {
- if (errorCode != 0) {
- return true;
- }
- if (treatFailureAsError &&
- (sink.getReadFailureCount() > allowedFailures || sink.getWriteFailureCount() > allowedFailures)) {
- LOG.error("Too many failures detected, treating failure as error, failing the Canary.");
- errorCode = FAILURE_EXIT_CODE;
- return true;
- }
- return false;
- }
-
- @Override
- public void close() throws IOException {
- if (this.admin != null) this.admin.close();
- }
-
- protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
- ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
- if (null == connection) throw new IllegalArgumentException("connection shall not be null");
-
- this.connection = connection;
- this.targets = monitorTargets;
- this.useRegExp = useRegExp;
- this.treatFailureAsError = treatFailureAsError;
- this.sink = sink;
- this.executor = executor;
- this.allowedFailures = allowedFailures;
- }
-
- @Override
- public abstract void run();
-
- protected boolean initAdmin() {
- if (null == this.admin) {
- try {
- this.admin = this.connection.getAdmin();
- } catch (Exception e) {
- LOG.error("Initial HBaseAdmin failed...", e);
- this.errorCode = INIT_ERROR_EXIT_CODE;
- }
- } else if (admin.isAborted()) {
- LOG.error("HBaseAdmin aborted");
- this.errorCode = INIT_ERROR_EXIT_CODE;
- }
- return !this.hasError();
- }
+ static Canary create(Configuration conf, ExecutorService executor, CanaryTool.Sink sink) {
+ return new CanaryTool(conf, executor, sink);
}
/**
- * A monitor for region mode.
+ * Run Canary in Region mode.
+ *
+ * @param targets -- list of monitor tables.
+ * @return the exit code of the Canary tool.
*/
- private static class RegionMonitor extends Monitor {
- // 10 minutes
- private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
- // 1 days
- private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
-
- private long lastCheckTime = -1;
- private boolean writeSniffing;
- private TableName writeTableName;
- private int writeDataTTL;
- private float regionsLowerLimit;
- private float regionsUpperLimit;
- private int checkPeriod;
- private boolean rawScanEnabled;
-
- /**
- * This is a timeout per table. If read of each region in the table aggregated takes longer
- * than what is configured here, we log an ERROR rather than just an INFO.
- */
- private HashMap configuredReadTableTimeouts;
-
- private long configuredWriteTableTimeout;
-
- public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
- Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
- boolean treatFailureAsError, HashMap configuredReadTableTimeouts,
- long configuredWriteTableTimeout,
- long allowedFailures) {
- super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
- allowedFailures);
- Configuration conf = connection.getConfiguration();
- this.writeSniffing = writeSniffing;
- this.writeTableName = writeTableName;
- this.writeDataTTL =
- conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
- this.regionsLowerLimit =
- conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
- this.regionsUpperLimit =
- conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
- this.checkPeriod =
- conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
- DEFAULT_WRITE_TABLE_CHECK_PERIOD);
- this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
- this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts);
- this.configuredWriteTableTimeout = configuredWriteTableTimeout;
- }
-
- private RegionStdOutSink getSink() {
- if (!(sink instanceof RegionStdOutSink)) {
- throw new RuntimeException("Can only write to Region sink");
- }
- return ((RegionStdOutSink) sink);
- }
-
- @Override
- public void run() {
- if (this.initAdmin()) {
- try {
- List> taskFutures = new LinkedList<>();
- RegionStdOutSink regionSink = this.getSink();
- if (this.targets != null && this.targets.length > 0) {
- String[] tables = generateMonitorTables(this.targets);
- // Check to see that each table name passed in the -readTableTimeouts argument is also
- // passed as a monitor target.
- if (!new HashSet<>(Arrays.asList(tables)).
- containsAll(this.configuredReadTableTimeouts.keySet())) {
- LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " +
- "passed via command line.");
- this.errorCode = USAGE_EXIT_CODE;
- return;
- }
- this.initialized = true;
- for (String table : tables) {
- LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
- taskFutures.addAll(Canary.sniff(admin, regionSink, table, executor, TaskType.READ,
- this.rawScanEnabled, readLatency));
- }
- } else {
- taskFutures.addAll(sniff(TaskType.READ, regionSink));
- }
-
- if (writeSniffing) {
- if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
- try {
- checkWriteTableDistribution();
- } catch (IOException e) {
- LOG.error("Check canary table distribution failed!", e);
- }
- lastCheckTime = EnvironmentEdgeManager.currentTime();
- }
- // sniff canary table with write operation
- regionSink.initializeWriteLatency();
- LongAdder writeTableLatency = regionSink.getWriteLatency();
- taskFutures.addAll(Canary.sniff(admin, regionSink, admin.getDescriptor(writeTableName),
- executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency));
- }
-
- for (Future future : taskFutures) {
- try {
- future.get();
- } catch (ExecutionException e) {
- LOG.error("Sniff region failed!", e);
- }
- }
- Map actualReadTableLatency = regionSink.getReadLatencyMap();
- for (Map.Entry entry : configuredReadTableTimeouts.entrySet()) {
- String tableName = entry.getKey();
- if (actualReadTableLatency.containsKey(tableName)) {
- Long actual = actualReadTableLatency.get(tableName).longValue();
- Long configured = entry.getValue();
- if (actual > configured) {
- LOG.error("Read operation for {} took {}ms (Configured read timeout {}ms.",
- tableName, actual, configured);
- } else {
- LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.",
- tableName, actual, configured);
- }
- } else {
- LOG.error("Read operation for {} failed!", tableName);
- }
- }
- if (this.writeSniffing) {
- String writeTableStringName = this.writeTableName.getNameAsString();
- long actualWriteLatency = regionSink.getWriteLatency().longValue();
- LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.",
- writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout);
- // Check that the writeTable write operation latency does not exceed the configured timeout.
- if (actualWriteLatency > this.configuredWriteTableTimeout) {
- LOG.error("Write operation for {} exceeded the configured write timeout.",
- writeTableStringName);
- }
- }
- } catch (Exception e) {
- LOG.error("Run regionMonitor failed", e);
- this.errorCode = ERROR_EXIT_CODE;
- } finally {
- this.done = true;
- }
- }
- this.done = true;
- }
-
- /**
- * @return List of tables to use in test.
- */
- private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
- String[] returnTables = null;
-
- if (this.useRegExp) {
- Pattern pattern = null;
- List tds = null;
- Set tmpTables = new TreeSet<>();
- try {
- LOG.debug(String.format("reading list of tables"));
- tds = this.admin.listTableDescriptors(pattern);
- if (tds == null) {
- tds = Collections.emptyList();
- }
- for (String monitorTarget : monitorTargets) {
- pattern = Pattern.compile(monitorTarget);
- for (TableDescriptor td : tds) {
- if (pattern.matcher(td.getTableName().getNameAsString()).matches()) {
- tmpTables.add(td.getTableName().getNameAsString());
- }
- }
- }
- } catch (IOException e) {
- LOG.error("Communicate with admin failed", e);
- throw e;
- }
-
- if (tmpTables.size() > 0) {
- returnTables = tmpTables.toArray(new String[tmpTables.size()]);
- } else {
- String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
- LOG.error(msg);
- this.errorCode = INIT_ERROR_EXIT_CODE;
- throw new TableNotFoundException(msg);
- }
- } else {
- returnTables = monitorTargets;
- }
-
- return returnTables;
- }
-
- /*
- * Canary entry point to monitor all the tables.
- */
- private List> sniff(TaskType taskType, RegionStdOutSink regionSink)
- throws Exception {
- LOG.debug("Reading list of tables");
- List> taskFutures = new LinkedList<>();
- for (TableDescriptor td: admin.listTableDescriptors()) {
- if (admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) &&
- (!td.getTableName().equals(writeTableName))) {
- LongAdder readLatency =
- regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString());
- taskFutures.addAll(Canary.sniff(admin, sink, td, executor, taskType, this.rawScanEnabled,
- readLatency));
- }
- }
- return taskFutures;
- }
-
- private void checkWriteTableDistribution() throws IOException {
- if (!admin.tableExists(writeTableName)) {
- int numberOfServers =
- admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().size();
- if (numberOfServers == 0) {
- throw new IllegalStateException("No live regionservers");
- }
- createWriteTable(numberOfServers);
- }
-
- if (!admin.isTableEnabled(writeTableName)) {
- admin.enableTable(writeTableName);
- }
-
- ClusterMetrics status =
- admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER));
- int numberOfServers = status.getLiveServerMetrics().size();
- if (status.getLiveServerMetrics().containsKey(status.getMasterName())) {
- numberOfServers -= 1;
- }
-
- List> pairs =
- MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
- int numberOfRegions = pairs.size();
- if (numberOfRegions < numberOfServers * regionsLowerLimit
- || numberOfRegions > numberOfServers * regionsUpperLimit) {
- admin.disableTable(writeTableName);
- admin.deleteTable(writeTableName);
- createWriteTable(numberOfServers);
- }
- HashSet serverSet = new HashSet<>();
- for (Pair pair : pairs) {
- serverSet.add(pair.getSecond());
- }
- int numberOfCoveredServers = serverSet.size();
- if (numberOfCoveredServers < numberOfServers) {
- admin.balance();
- }
- }
-
- private void createWriteTable(int numberOfServers) throws IOException {
- int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
- LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " +
- "(current lower limit of regions per server is {} and you can change it with config {}).",
- numberOfServers, numberOfRegions, regionsLowerLimit,
- HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY);
- HTableDescriptor desc = new HTableDescriptor(writeTableName);
- HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
- family.setMaxVersions(1);
- family.setTimeToLive(writeDataTTL);
-
- desc.addFamily(family);
- byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
- admin.createTable(desc, splits);
- }
- }
+ public int checkRegions(String[] targets) throws Exception;
/**
- * Canary entry point for specified table.
- * @throws Exception
- */
- private static List> sniff(final Admin admin, final Sink sink, String tableName,
- ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency)
- throws Exception {
- LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName);
- if (admin.isTableEnabled(TableName.valueOf(tableName))) {
- return Canary.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)),
- executor, taskType, rawScanEnabled, readLatency);
- } else {
- LOG.warn("Table {} is not enabled", tableName);
- }
- return new LinkedList<>();
- }
-
- /*
- * Loops over regions of this table, and outputs information about the state.
+ * Runs Canary in Region server mode.
+ *
+ * @param targets -- list of monitor tables.
+ * @return the exit code of the Canary tool.
*/
- private static List> sniff(final Admin admin, final Sink sink,
- TableDescriptor tableDesc, ExecutorService executor, TaskType taskType,
- boolean rawScanEnabled, LongAdder rwLatency) throws Exception {
- LOG.debug("Reading list of regions for table {}", tableDesc.getTableName());
- try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) {
- List tasks = new ArrayList<>();
- try (RegionLocator regionLocator =
- admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
- for (HRegionLocation location: regionLocator.getAllRegionLocations()) {
- ServerName rs = location.getServerName();
- RegionInfo region = location.getRegion();
- tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink)sink,
- taskType, rawScanEnabled, rwLatency));
- Map regionMap = ((RegionStdOutSink) sink).getRegionMap();
- regionMap.put(region.getRegionNameAsString(), new RegionTaskResult(region,
- region.getTable(), rs));
- }
- return executor.invokeAll(tasks);
- }
- } catch (TableNotFoundException e) {
- return Collections.EMPTY_LIST;
- }
- }
-
- // monitor for zookeeper mode
- private static class ZookeeperMonitor extends Monitor {
- private List hosts;
- private final String znode;
- private final int timeout;
-
- protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
- Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
- super(connection, monitorTargets, useRegExp,
- sink, executor, treatFailureAsError, allowedFailures);
- Configuration configuration = connection.getConfiguration();
- znode =
- configuration.get(ZOOKEEPER_ZNODE_PARENT,
- DEFAULT_ZOOKEEPER_ZNODE_PARENT);
- timeout = configuration
- .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
- ConnectStringParser parser =
- new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
- hosts = Lists.newArrayList();
- for (InetSocketAddress server : parser.getServerAddresses()) {
- hosts.add(server.toString());
- }
- if (allowedFailures > (hosts.size() - 1) / 2) {
- LOG.warn("Confirm allowable number of failed ZooKeeper nodes, as quorum will " +
- "already be lost. Setting of {} failures is unexpected for {} ensemble size.",
- allowedFailures, hosts.size());
- }
- }
-
- @Override public void run() {
- List tasks = Lists.newArrayList();
- ZookeeperStdOutSink zkSink = null;
- try {
- zkSink = this.getSink();
- } catch (RuntimeException e) {
- LOG.error("Run ZooKeeperMonitor failed!", e);
- this.errorCode = ERROR_EXIT_CODE;
- }
- this.initialized = true;
- for (final String host : hosts) {
- tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink));
- }
- try {
- for (Future future : this.executor.invokeAll(tasks)) {
- try {
- future.get();
- } catch (ExecutionException e) {
- LOG.error("Sniff zookeeper failed!", e);
- this.errorCode = ERROR_EXIT_CODE;
- }
- }
- } catch (InterruptedException e) {
- this.errorCode = ERROR_EXIT_CODE;
- Thread.currentThread().interrupt();
- LOG.error("Sniff zookeeper interrupted!", e);
- }
- this.done = true;
- }
-
- private ZookeeperStdOutSink getSink() {
- if (!(sink instanceof ZookeeperStdOutSink)) {
- throw new RuntimeException("Can only write to zookeeper sink");
- }
- return ((ZookeeperStdOutSink) sink);
- }
- }
-
+ public int checkRegionServers(String[] targets) throws Exception;
/**
- * A monitor for regionserver mode
+ * Runs Canary in Zookeeper mode.
+ *
+ * @return the exit code of the Canary tool.
*/
- private static class RegionServerMonitor extends Monitor {
- private boolean allRegions;
-
- public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
- Sink sink, ExecutorService executor, boolean allRegions,
- boolean treatFailureAsError, long allowedFailures) {
- super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
- allowedFailures);
- this.allRegions = allRegions;
- }
-
- private RegionServerStdOutSink getSink() {
- if (!(sink instanceof RegionServerStdOutSink)) {
- throw new RuntimeException("Can only write to regionserver sink");
- }
- return ((RegionServerStdOutSink) sink);
- }
-
- @Override
- public void run() {
- if (this.initAdmin() && this.checkNoTableNames()) {
- RegionServerStdOutSink regionServerSink = null;
- try {
- regionServerSink = this.getSink();
- } catch (RuntimeException e) {
- LOG.error("Run RegionServerMonitor failed!", e);
- this.errorCode = ERROR_EXIT_CODE;
- }
- Map> rsAndRMap = this.filterRegionServerByName();
- this.initialized = true;
- this.monitorRegionServers(rsAndRMap, regionServerSink);
- }
- this.done = true;
- }
-
- private boolean checkNoTableNames() {
- List foundTableNames = new ArrayList<>();
- TableName[] tableNames = null;
- LOG.debug("Reading list of tables");
- try {
- tableNames = this.admin.listTableNames();
- } catch (IOException e) {
- LOG.error("Get listTableNames failed", e);
- this.errorCode = INIT_ERROR_EXIT_CODE;
- return false;
- }
-
- if (this.targets == null || this.targets.length == 0) return true;
+ public int checkZooKeeper() throws Exception;
- for (String target : this.targets) {
- for (TableName tableName : tableNames) {
- if (target.equals(tableName.getNameAsString())) {
- foundTableNames.add(target);
- }
- }
- }
+ public Map getReadFailures();
- if (foundTableNames.size() > 0) {
- System.err.println("Cannot pass a tablename when using the -regionserver " +
- "option, tablenames:" + foundTableNames.toString());
- this.errorCode = USAGE_EXIT_CODE;
- }
- return foundTableNames.isEmpty();
- }
-
- private void monitorRegionServers(Map> rsAndRMap, RegionServerStdOutSink regionServerSink) {
- List tasks = new ArrayList<>();
- Map successMap = new HashMap<>();
- Random rand = new Random();
- for (Map.Entry> entry : rsAndRMap.entrySet()) {
- String serverName = entry.getKey();
- AtomicLong successes = new AtomicLong(0);
- successMap.put(serverName, successes);
- if (entry.getValue().isEmpty()) {
- LOG.error("Regionserver not serving any regions - {}", serverName);
- } else if (this.allRegions) {
- for (RegionInfo region : entry.getValue()) {
- tasks.add(new RegionServerTask(this.connection,
- serverName,
- region,
- regionServerSink,
- successes));
- }
- } else {
- // random select a region if flag not set
- RegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
- tasks.add(new RegionServerTask(this.connection,
- serverName,
- region,
- regionServerSink,
- successes));
- }
- }
- try {
- for (Future future : this.executor.invokeAll(tasks)) {
- try {
- future.get();
- } catch (ExecutionException e) {
- LOG.error("Sniff regionserver failed!", e);
- this.errorCode = ERROR_EXIT_CODE;
- }
- }
- if (this.allRegions) {
- for (Map.Entry> entry : rsAndRMap.entrySet()) {
- String serverName = entry.getKey();
- LOG.info("Successfully read {} regions out of {} on regionserver {}",
- successMap.get(serverName), entry.getValue().size(), serverName);
- }
- }
- } catch (InterruptedException e) {
- this.errorCode = ERROR_EXIT_CODE;
- LOG.error("Sniff regionserver interrupted!", e);
- }
- }
-
- private Map> filterRegionServerByName() {
- Map> regionServerAndRegionsMap = this.getAllRegionServerByName();
- regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
- return regionServerAndRegionsMap;
- }
-
- private Map> getAllRegionServerByName() {
- Map> rsAndRMap = new HashMap<>();
- try {
- LOG.debug("Reading list of tables and locations");
- List tableDescs = this.admin.listTableDescriptors();
- List regions = null;
- for (TableDescriptor tableDesc: tableDescs) {
- try (RegionLocator regionLocator =
- this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
- for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
- ServerName rs = location.getServerName();
- String rsName = rs.getHostname();
- RegionInfo r = location.getRegion();
- if (rsAndRMap.containsKey(rsName)) {
- regions = rsAndRMap.get(rsName);
- } else {
- regions = new ArrayList<>();
- rsAndRMap.put(rsName, regions);
- }
- regions.add(r);
- }
- }
- }
-
- // get any live regionservers not serving any regions
- for (ServerName rs: this.admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
- .getLiveServerMetrics().keySet()) {
- String rsName = rs.getHostname();
- if (!rsAndRMap.containsKey(rsName)) {
- rsAndRMap.put(rsName, Collections. emptyList());
- }
- }
- } catch (IOException e) {
- LOG.error("Get HTables info failed", e);
- this.errorCode = INIT_ERROR_EXIT_CODE;
- }
- return rsAndRMap;
- }
-
- private Map> doFilterRegionServerByName(
- Map> fullRsAndRMap) {
-
- Map> filteredRsAndRMap = null;
-
- if (this.targets != null && this.targets.length > 0) {
- filteredRsAndRMap = new HashMap<>();
- Pattern pattern = null;
- Matcher matcher = null;
- boolean regExpFound = false;
- for (String rsName : this.targets) {
- if (this.useRegExp) {
- regExpFound = false;
- pattern = Pattern.compile(rsName);
- for (Map.Entry> entry : fullRsAndRMap.entrySet()) {
- matcher = pattern.matcher(entry.getKey());
- if (matcher.matches()) {
- filteredRsAndRMap.put(entry.getKey(), entry.getValue());
- regExpFound = true;
- }
- }
- if (!regExpFound) {
- LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName);
- }
- } else {
- if (fullRsAndRMap.containsKey(rsName)) {
- filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
- } else {
- LOG.info("No RegionServerInfo found, regionServerName {}", rsName);
- }
- }
- }
- } else {
- filteredRsAndRMap = fullRsAndRMap;
- }
- return filteredRsAndRMap;
- }
- }
-
- public static void main(String[] args) throws Exception {
- final Configuration conf = HBaseConfiguration.create();
-
- // Loading the generic options to conf
- new GenericOptionsParser(conf, args);
-
- int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
- LOG.info("Execution thread count={}", numThreads);
-
- int exitCode = 0;
- ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
- try {
- exitCode = ToolRunner.run(conf, new Canary(executor), args);
- } finally {
- executor.shutdown();
- }
- System.exit(exitCode);
- }
-}
+ public Map getWriteFailures();
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/CanaryTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/CanaryTool.java
new file mode 100644
index 000000000000..0be7da2f81f4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/CanaryTool.java
@@ -0,0 +1,1866 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.tool;
+
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
+import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.ClusterMetrics.Option;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ConnectStringParser;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+/**
+ * HBase Canary Tool for "canary monitoring" of a running HBase cluster.
+ *
+ * There are three modes:
+ *
+ * - region mode (Default): For each region, try to get one row per column family outputting
+ * information on failure (ERROR) or else the latency.
+ *
+ *
+ * - regionserver mode: For each regionserver try to get one row from one table selected
+ * randomly outputting information on failure (ERROR) or else the latency.
+ *
+ *
+ * - zookeeper mode: for each zookeeper instance, selects a znode outputting information on
+ * failure (ERROR) or else the latency.
+ *
+ *
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class CanaryTool implements Tool, Canary {
+
+ @Override
+ public int checkRegions(String[] targets) throws Exception {
+ String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT);
+ try {
+ if (configuredReadTableTimeoutsStr != null) {
+ populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr);
+ }
+ } catch (IllegalArgumentException e) {
+ LOG.error("Constructing read table timeouts map failed ", e);
+ return USAGE_EXIT_CODE;
+ }
+ return runMonitor(targets);
+ }
+
+ @Override
+ public int checkRegionServers(String[] targets) throws Exception {
+ regionServerMode = true;
+ return runMonitor(targets);
+ }
+
+ @Override
+ public int checkZooKeeper() throws Exception {
+ zookeeperMode = true;
+ return runMonitor(null);
+ }
+
+ /**
+ * Sink interface used by the canary to output information
+ */
+ public interface Sink {
+ long getReadFailureCount();
+ long incReadFailureCount();
+ Map getReadFailures();
+ void updateReadFailures(String regionName, String serverName);
+ long getWriteFailureCount();
+ long incWriteFailureCount();
+ Map getWriteFailures();
+ void updateWriteFailures(String regionName, String serverName);
+ long getReadSuccessCount();
+ long incReadSuccessCount();
+ long getWriteSuccessCount();
+ long incWriteSuccessCount();
+ }
+
+ /**
+ * Simple implementation of canary sink that allows plotting to a file or standard output.
+ */
+ public static class StdOutSink implements Sink {
+ private AtomicLong readFailureCount = new AtomicLong(0),
+ writeFailureCount = new AtomicLong(0),
+ readSuccessCount = new AtomicLong(0),
+ writeSuccessCount = new AtomicLong(0);
+ private Map readFailures = new ConcurrentHashMap<>();
+ private Map writeFailures = new ConcurrentHashMap<>();
+
+ @Override
+ public long getReadFailureCount() {
+ return readFailureCount.get();
+ }
+
+ @Override
+ public long incReadFailureCount() {
+ return readFailureCount.incrementAndGet();
+ }
+
+ @Override
+ public Map getReadFailures() {
+ return readFailures;
+ }
+
+ @Override
+ public void updateReadFailures(String regionName, String serverName) {
+ readFailures.put(regionName, serverName);
+ }
+
+ @Override
+ public long getWriteFailureCount() {
+ return writeFailureCount.get();
+ }
+
+ @Override
+ public long incWriteFailureCount() {
+ return writeFailureCount.incrementAndGet();
+ }
+
+ @Override
+ public Map getWriteFailures() {
+ return writeFailures;
+ }
+
+ @Override
+ public void updateWriteFailures(String regionName, String serverName) {
+ writeFailures.put(regionName, serverName);
+ }
+
+ @Override
+ public long getReadSuccessCount() {
+ return readSuccessCount.get();
+ }
+
+ @Override
+ public long incReadSuccessCount() {
+ return readSuccessCount.incrementAndGet();
+ }
+
+ @Override
+ public long getWriteSuccessCount() {
+ return writeSuccessCount.get();
+ }
+
+ @Override
+ public long incWriteSuccessCount() {
+ return writeSuccessCount.incrementAndGet();
+ }
+ }
+
+ /**
+ * By RegionServer, for 'regionserver' mode.
+ */
+ public static class RegionServerStdOutSink extends StdOutSink {
+ public void publishReadFailure(String table, String server) {
+ incReadFailureCount();
+ LOG.error("Read from {} on {}", table, server);
+ }
+
+ public void publishReadTiming(String table, String server, long msTime) {
+ LOG.info("Read from {} on {} in {}ms", table, server, msTime);
+ }
+ }
+
+ /**
+ * Output for 'zookeeper' mode.
+ */
+ public static class ZookeeperStdOutSink extends StdOutSink {
+ public void publishReadFailure(String znode, String server) {
+ incReadFailureCount();
+ LOG.error("Read from {} on {}", znode, server);
+ }
+
+ public void publishReadTiming(String znode, String server, long msTime) {
+ LOG.info("Read from {} on {} in {}ms", znode, server, msTime);
+ }
+ }
+
+ /**
+ * By Region, for 'region' mode.
+ */
+ public static class RegionStdOutSink extends StdOutSink {
+ private Map perTableReadLatency = new HashMap<>();
+ private LongAdder writeLatency = new LongAdder();
+ private final Map regionMap = new ConcurrentHashMap<>();
+
+ public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) {
+ incReadFailureCount();
+ LOG.error("Read from {} on {} failed", region.getRegionNameAsString(), serverName, e);
+ }
+
+ public void publishReadFailure(ServerName serverName, RegionInfo region,
+ ColumnFamilyDescriptor column, Exception e) {
+ incReadFailureCount();
+ LOG.error("Read from {} on {} {} failed", region.getRegionNameAsString(), serverName,
+ column.getNameAsString(), e);
+ }
+
+ public void publishReadTiming(ServerName serverName, RegionInfo region,
+ ColumnFamilyDescriptor column, long msTime) {
+ incReadSuccessCount();
+ RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString());
+ res.setReadSuccess();
+ res.setReadLatency(msTime);
+ LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
+ column.getNameAsString(), msTime);
+ }
+
+ public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) {
+ incWriteFailureCount();
+ LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e);
+ }
+
+ public void publishWriteFailure(ServerName serverName, RegionInfo region,
+ ColumnFamilyDescriptor column, Exception e) {
+ incWriteFailureCount();
+ LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName,
+ column.getNameAsString(), e);
+ }
+
+ public void publishWriteTiming(ServerName serverName, RegionInfo region,
+ ColumnFamilyDescriptor column, long msTime) {
+ incWriteSuccessCount();
+ RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString());
+ res.setWriteSuccess();
+ res.setWriteLatency(msTime);
+ LOG.info("Write to {} on {} {} in {}ms",
+ region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime);
+ }
+
+ public Map getReadLatencyMap() {
+ return this.perTableReadLatency;
+ }
+
+ public LongAdder initializeAndGetReadLatencyForTable(String tableName) {
+ LongAdder initLatency = new LongAdder();
+ this.perTableReadLatency.put(tableName, initLatency);
+ return initLatency;
+ }
+
+ public void initializeWriteLatency() {
+ this.writeLatency.reset();
+ }
+
+ public LongAdder getWriteLatency() {
+ return this.writeLatency;
+ }
+
+ public Map getRegionMap() {
+ return this.regionMap;
+ }
+
+ public int getTotalExpectedRegions() {
+ return this.regionMap.size();
+ }
+ }
+
+ /**
+ * Run a single zookeeper Task and then exit.
+ */
+ static class ZookeeperTask implements Callable {
+ private final Connection connection;
+ private final String host;
+ private String znode;
+ private final int timeout;
+ private ZookeeperStdOutSink sink;
+
+ public ZookeeperTask(Connection connection, String host, String znode, int timeout,
+ ZookeeperStdOutSink sink) {
+ this.connection = connection;
+ this.host = host;
+ this.znode = znode;
+ this.timeout = timeout;
+ this.sink = sink;
+ }
+
+ @Override public Void call() throws Exception {
+ ZooKeeper zooKeeper = null;
+ try {
+ zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
+ Stat exists = zooKeeper.exists(znode, false);
+ StopWatch stopwatch = new StopWatch();
+ stopwatch.start();
+ zooKeeper.getData(znode, false, exists);
+ stopwatch.stop();
+ sink.publishReadTiming(znode, host, stopwatch.getTime());
+ } catch (KeeperException | InterruptedException e) {
+ sink.publishReadFailure(znode, host);
+ } finally {
+ if (zooKeeper != null) {
+ zooKeeper.close();
+ }
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Run a single Region Task and then exit. For each column family of the Region, get one row and
+ * output latency or failure.
+ */
+ static class RegionTask implements Callable {
+ public enum TaskType{
+ READ, WRITE
+ }
+ private Connection connection;
+ private RegionInfo region;
+ private RegionStdOutSink sink;
+ private TaskType taskType;
+ private boolean rawScanEnabled;
+ private ServerName serverName;
+ private LongAdder readWriteLatency;
+
+ RegionTask(Connection connection, RegionInfo region, ServerName serverName,
+ RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency) {
+ this.connection = connection;
+ this.region = region;
+ this.serverName = serverName;
+ this.sink = sink;
+ this.taskType = taskType;
+ this.rawScanEnabled = rawScanEnabled;
+ this.readWriteLatency = rwLatency;
+ }
+
+ @Override
+ public Void call() {
+ switch (taskType) {
+ case READ:
+ return read();
+ case WRITE:
+ return write();
+ default:
+ return read();
+ }
+ }
+
+ public Void read() {
+ Table table = null;
+ TableDescriptor tableDesc = null;
+ try {
+ LOG.debug("Reading table descriptor for table {}", region.getTable());
+ table = connection.getTable(region.getTable());
+ tableDesc = table.getDescriptor();
+ } catch (IOException e) {
+ LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e);
+ sink.publishReadFailure(serverName, region, e);
+ if (table != null) {
+ try {
+ table.close();
+ } catch (IOException ioe) {
+ LOG.error("Close table failed", e);
+ }
+ }
+ return null;
+ }
+
+ byte[] startKey = null;
+ Get get = null;
+ Scan scan = null;
+ ResultScanner rs = null;
+ StopWatch stopWatch = new StopWatch();
+ for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
+ stopWatch.reset();
+ startKey = region.getStartKey();
+ // Can't do a get on empty start row so do a Scan of first element if any instead.
+ if (startKey.length > 0) {
+ get = new Get(startKey);
+ get.setCacheBlocks(false);
+ get.setFilter(new FirstKeyOnlyFilter());
+ get.addFamily(column.getName());
+ } else {
+ scan = new Scan();
+ LOG.debug("rawScan {} for {}", rawScanEnabled, tableDesc.getTableName());
+ scan.setRaw(rawScanEnabled);
+ scan.setCaching(1);
+ scan.setCacheBlocks(false);
+ scan.setFilter(new FirstKeyOnlyFilter());
+ scan.addFamily(column.getName());
+ scan.setMaxResultSize(1L);
+ scan.setOneRowLimit();
+ }
+ LOG.debug("Reading from {} {} {} {}", tableDesc.getTableName(),
+ region.getRegionNameAsString(), column.getNameAsString(),
+ Bytes.toStringBinary(startKey));
+ try {
+ stopWatch.start();
+ if (startKey.length > 0) {
+ table.get(get);
+ } else {
+ rs = table.getScanner(scan);
+ rs.next();
+ }
+ stopWatch.stop();
+ this.readWriteLatency.add(stopWatch.getTime());
+ sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
+ } catch (Exception e) {
+ sink.publishReadFailure(serverName, region, column, e);
+ sink.updateReadFailures(region.getRegionNameAsString(), serverName.getHostname());
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
+ scan = null;
+ get = null;
+ }
+ }
+ try {
+ table.close();
+ } catch (IOException e) {
+ LOG.error("Close table failed", e);
+ }
+ return null;
+ }
+
+ /**
+ * Check writes for the canary table
+ */
+ private Void write() {
+ Table table = null;
+ TableDescriptor tableDesc = null;
+ try {
+ table = connection.getTable(region.getTable());
+ tableDesc = table.getDescriptor();
+ byte[] rowToCheck = region.getStartKey();
+ if (rowToCheck.length == 0) {
+ rowToCheck = new byte[]{0x0};
+ }
+ int writeValueSize =
+ connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
+ for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
+ Put put = new Put(rowToCheck);
+ byte[] value = new byte[writeValueSize];
+ Bytes.random(value);
+ put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
+
+ LOG.debug("Writing to {} {} {} {}",
+ tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
+ Bytes.toStringBinary(rowToCheck));
+ try {
+ long startTime = System.currentTimeMillis();
+ table.put(put);
+ long time = System.currentTimeMillis() - startTime;
+ this.readWriteLatency.add(time);
+ sink.publishWriteTiming(serverName, region, column, time);
+ } catch (Exception e) {
+ sink.publishWriteFailure(serverName, region, column, e);
+ }
+ }
+ table.close();
+ } catch (IOException e) {
+ sink.publishWriteFailure(serverName, region, e);
+ sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname() );
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Run a single RegionServer Task and then exit.
+ * Get one row from a region on the regionserver and output latency or the failure.
+ */
+ static class RegionServerTask implements Callable {
+ private Connection connection;
+ private String serverName;
+ private RegionInfo region;
+ private RegionServerStdOutSink sink;
+ private AtomicLong successes;
+
+ RegionServerTask(Connection connection, String serverName, RegionInfo region,
+ RegionServerStdOutSink sink, AtomicLong successes) {
+ this.connection = connection;
+ this.serverName = serverName;
+ this.region = region;
+ this.sink = sink;
+ this.successes = successes;
+ }
+
+ @Override
+ public Void call() {
+ TableName tableName = null;
+ Table table = null;
+ Get get = null;
+ byte[] startKey = null;
+ Scan scan = null;
+ StopWatch stopWatch = new StopWatch();
+ // monitor one region on every region server
+ stopWatch.reset();
+ try {
+ tableName = region.getTable();
+ table = connection.getTable(tableName);
+ startKey = region.getStartKey();
+ // Can't do a get on empty start row so do a Scan of first element if any instead.
+ LOG.debug("Reading from {} {} {} {}",
+ serverName, region.getTable(), region.getRegionNameAsString(),
+ Bytes.toStringBinary(startKey));
+ if (startKey.length > 0) {
+ get = new Get(startKey);
+ get.setCacheBlocks(false);
+ get.setFilter(new FirstKeyOnlyFilter());
+ stopWatch.start();
+ table.get(get);
+ stopWatch.stop();
+ } else {
+ scan = new Scan();
+ scan.setCacheBlocks(false);
+ scan.setFilter(new FirstKeyOnlyFilter());
+ scan.setCaching(1);
+ scan.setMaxResultSize(1L);
+ scan.setOneRowLimit();
+ stopWatch.start();
+ ResultScanner s = table.getScanner(scan);
+ s.next();
+ s.close();
+ stopWatch.stop();
+ }
+ successes.incrementAndGet();
+ sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
+ } catch (TableNotFoundException tnfe) {
+ LOG.error("Table may be deleted", tnfe);
+ // This is ignored because it doesn't imply that the regionserver is dead
+ } catch (TableNotEnabledException tnee) {
+ // This is considered a success since we got a response.
+ successes.incrementAndGet();
+ LOG.debug("The targeted table was disabled. Assuming success.");
+ } catch (DoNotRetryIOException dnrioe) {
+ sink.publishReadFailure(tableName.getNameAsString(), serverName);
+ LOG.error(dnrioe.toString(), dnrioe);
+ } catch (IOException e) {
+ sink.publishReadFailure(tableName.getNameAsString(), serverName);
+ LOG.error(e.toString(), e);
+ } finally {
+ if (table != null) {
+ try {
+ table.close();
+ } catch (IOException e) {/* DO NOTHING */
+ LOG.error("Close table failed", e);
+ }
+ }
+ scan = null;
+ get = null;
+ startKey = null;
+ }
+ return null;
+ }
+ }
+
+ private static final int USAGE_EXIT_CODE = 1;
+ private static final int INIT_ERROR_EXIT_CODE = 2;
+ private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
+ private static final int ERROR_EXIT_CODE = 4;
+ private static final int FAILURE_EXIT_CODE = 5;
+
+ private static final long DEFAULT_INTERVAL = 60000;
+
+ private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
+ private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
+
+ private static final Logger LOG = LoggerFactory.getLogger(Canary.class);
+
+ public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
+ NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
+
+ private static final String CANARY_TABLE_FAMILY_NAME = "Test";
+
+ private Configuration conf = null;
+ private long interval = 0;
+ private Sink sink = null;
+
+ /**
+ * True if we are to run in 'regionServer' mode.
+ */
+ private boolean regionServerMode = false;
+
+ /**
+ * True if we are to run in zookeeper 'mode'.
+ */
+ private boolean zookeeperMode = false;
+
+ /**
+ * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e.
+ * we aggregate time to fetch each region and it needs to be less than this value else we
+ * log an ERROR.
+ */
+ private HashMap configuredReadTableTimeouts = new HashMap<>();
+
+ public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS
+ = "hbase.canary.regionserver_all_regions";
+
+ public static final String HBASE_CANARY_REGION_WRITE_SNIFFING
+ = "hbase.canary.region.write.sniffing";
+ public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT
+ = "hbase.canary.region.write.table.timeout";
+ public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME
+ = "hbase.canary.region.write.table.name";
+ public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT
+ = "hbase.canary.region.read.table.timeout";
+
+ public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES
+ = "hbase.canary.zookeeper.permitted.failures";
+
+ public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex";
+ public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout";
+ public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error";
+
+
+ private ExecutorService executor; // threads to retrieve data from regionservers
+
+ public CanaryTool() {
+ this(new ScheduledThreadPoolExecutor(1));
+ }
+
+ public CanaryTool(ExecutorService executor) {
+ this(executor, null);
+ }
+
+ @VisibleForTesting
+ CanaryTool(ExecutorService executor, Sink sink) {
+ this.executor = executor;
+ this.sink = sink;
+ }
+
+ CanaryTool(Configuration conf, ExecutorService executor) {
+ this(conf, executor, null);
+ }
+
+ CanaryTool(Configuration conf, ExecutorService executor, Sink sink) {
+ this(executor, sink);
+ setConf(conf);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ if (conf == null) {
+ conf = HBaseConfiguration.create();
+ }
+ this.conf = conf;
+ }
+
+ private int parseArgs(String[] args) {
+ int index = -1;
+ long permittedFailures = 0;
+ boolean regionServerAllRegions = false, writeSniffing = false;
+ String readTableTimeoutsStr = null;
+
+ // Process command line args
+ for (int i = 0; i < args.length; i++) {
+ String cmd = args[i];
+
+ if (cmd.startsWith("-")) {
+ if (index >= 0) {
+ // command line args must be in the form: [opts] [table 1 [table 2 ...]]
+ System.err.println("Invalid command line options");
+ printUsageAndExit();
+ }
+
+ if (cmd.equals("-help") || cmd.equals("-h")) {
+ // user asked for help, print the help and quit.
+ printUsageAndExit();
+ } else if (cmd.equals("-daemon") && interval == 0) {
+ // user asked for daemon mode, set a default interval between checks
+ interval = DEFAULT_INTERVAL;
+ } else if (cmd.equals("-interval")) {
+ // user has specified an interval for canary breaths (-interval N)
+ i++;
+
+ if (i == args.length) {
+ System.err.println("-interval takes a numeric seconds value argument.");
+ printUsageAndExit();
+ }
+
+ try {
+ interval = Long.parseLong(args[i]) * 1000;
+ } catch (NumberFormatException e) {
+ System.err.println("-interval needs a numeric value argument.");
+ printUsageAndExit();
+ }
+ } else if (cmd.equals("-zookeeper")) {
+ this.zookeeperMode = true;
+ } else if(cmd.equals("-regionserver")) {
+ this.regionServerMode = true;
+ } else if(cmd.equals("-allRegions")) {
+ conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true);
+ regionServerAllRegions = true;
+ } else if(cmd.equals("-writeSniffing")) {
+ writeSniffing = true;
+ conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true);
+ } else if(cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) {
+ conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
+ } else if (cmd.equals("-e")) {
+ conf.setBoolean(HBASE_CANARY_USE_REGEX, true);
+ } else if (cmd.equals("-t")) {
+ i++;
+
+ if (i == args.length) {
+ System.err.println("-t takes a numeric milliseconds value argument.");
+ printUsageAndExit();
+ }
+ long timeout = 0;
+ try {
+ timeout = Long.parseLong(args[i]);
+ } catch (NumberFormatException e) {
+ System.err.println("-t takes a numeric milliseconds value argument.");
+ printUsageAndExit();
+ }
+ conf.setLong(HBASE_CANARY_TIMEOUT, timeout);
+ } else if(cmd.equals("-writeTableTimeout")) {
+ i++;
+
+ if (i == args.length) {
+ System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
+ printUsageAndExit();
+ }
+ long configuredWriteTableTimeout = 0;
+ try {
+ configuredWriteTableTimeout = Long.parseLong(args[i]);
+ } catch (NumberFormatException e) {
+ System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
+ printUsageAndExit();
+ }
+ conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout);
+ } else if (cmd.equals("-writeTable")) {
+ i++;
+
+ if (i == args.length) {
+ System.err.println("-writeTable takes a string tablename value argument.");
+ printUsageAndExit();
+ }
+ conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]);
+ } else if (cmd.equals("-f")) {
+ i++;
+
+ if (i == args.length) {
+ System.err
+ .println("-f needs a boolean value argument (true|false).");
+ printUsageAndExit();
+ }
+
+ conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i]));
+ } else if (cmd.equals("-readTableTimeouts")) {
+ i++;
+
+ if (i == args.length) {
+ System.err.println("-readTableTimeouts needs a comma-separated list of read " +
+ "millisecond timeouts per table (without spaces).");
+ printUsageAndExit();
+ }
+ readTableTimeoutsStr = args[i];
+ conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr);
+ } else if (cmd.equals("-permittedZookeeperFailures")) {
+ i++;
+
+ if (i == args.length) {
+ System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
+ printUsageAndExit();
+ }
+ try {
+ permittedFailures = Long.parseLong(args[i]);
+ } catch (NumberFormatException e) {
+ System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
+ printUsageAndExit();
+ }
+ conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures);
+ } else {
+ // no options match
+ System.err.println(cmd + " options is invalid.");
+ printUsageAndExit();
+ }
+ } else if (index < 0) {
+ // keep track of first table name specified by the user
+ index = i;
+ }
+ }
+ if (regionServerAllRegions && !this.regionServerMode) {
+ System.err.println("-allRegions can only be specified in regionserver mode.");
+ printUsageAndExit();
+ }
+ if (this.zookeeperMode) {
+ if (this.regionServerMode || regionServerAllRegions || writeSniffing) {
+ System.err.println("-zookeeper is exclusive and cannot be combined with "
+ + "other modes.");
+ printUsageAndExit();
+ }
+ }
+ if (permittedFailures != 0 && !this.zookeeperMode) {
+ System.err.println("-permittedZookeeperFailures requires -zookeeper mode.");
+ printUsageAndExit();
+ }
+ if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) {
+ System.err.println("-readTableTimeouts can only be configured in region mode.");
+ printUsageAndExit();
+ }
+ return index;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ int index = parseArgs(args);
+ String[] monitorTargets = null;
+
+ if (index >= 0) {
+ int length = args.length - index;
+ monitorTargets = new String[length];
+ System.arraycopy(args, index, monitorTargets, 0, length);
+ }
+
+ if (zookeeperMode) {
+ return checkZooKeeper();
+ } else if (regionServerMode) {
+ return checkRegionServers(monitorTargets);
+ } else {
+ return checkRegions(monitorTargets);
+ }
+ }
+
+ private int runMonitor(String[] monitorTargets) throws Exception {
+ ChoreService choreService = null;
+
+ // Launches chore for refreshing kerberos credentials if security is enabled.
+ // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
+ // for more details.
+ final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
+ if (authChore != null) {
+ choreService = new ChoreService("CANARY_TOOL");
+ choreService.scheduleChore(authChore);
+ }
+
+ // Start to prepare the stuffs
+ Monitor monitor = null;
+ Thread monitorThread;
+ long startTime = 0;
+ long currentTimeLength = 0;
+ boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
+ long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT);
+ // Get a connection to use in below.
+ try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
+ do {
+ // Do monitor !!
+ try {
+ monitor = this.newMonitor(connection, monitorTargets);
+ monitorThread = new Thread(monitor, "CanaryMonitor-" + System.currentTimeMillis());
+ startTime = System.currentTimeMillis();
+ monitorThread.start();
+ while (!monitor.isDone()) {
+ // wait for 1 sec
+ Thread.sleep(1000);
+ // exit if any error occurs
+ if (failOnError && monitor.hasError()) {
+ monitorThread.interrupt();
+ if (monitor.initialized) {
+ return monitor.errorCode;
+ } else {
+ return INIT_ERROR_EXIT_CODE;
+ }
+ }
+ currentTimeLength = System.currentTimeMillis() - startTime;
+ if (currentTimeLength > timeout) {
+ LOG.error("The monitor is running too long (" + currentTimeLength
+ + ") after timeout limit:" + timeout
+ + " will be killed itself !!");
+ if (monitor.initialized) {
+ return TIMEOUT_ERROR_EXIT_CODE;
+ } else {
+ return INIT_ERROR_EXIT_CODE;
+ }
+ }
+ }
+
+ if (failOnError && monitor.finalCheckForErrors()) {
+ monitorThread.interrupt();
+ return monitor.errorCode;
+ }
+ } finally {
+ if (monitor != null) monitor.close();
+ }
+
+ Thread.sleep(interval);
+ } while (interval > 0);
+ } // try-with-resources close
+
+ if (choreService != null) {
+ choreService.shutdown();
+ }
+ return monitor.errorCode;
+ }
+
+ @Override
+ public Map getReadFailures() {
+ return sink.getReadFailures();
+ }
+
+ @Override
+ public Map getWriteFailures() {
+ return sink.getWriteFailures();
+ }
+
+ private void printUsageAndExit() {
+ System.err.println(
+ "Usage: canary [OPTIONS] [ [ [ interval between checks in seconds");
+ System.err.println(" -e consider table/regionserver argument as regular " +
+ "expression");
+ System.err.println(" -f exit on first error; default=true");
+ System.err.println(" -failureAsError treat read/write failure as error");
+ System.err.println(" -t timeout for canary-test run; default=600000ms");
+ System.err.println(" -writeSniffing enable write sniffing");
+ System.err.println(" -writeTable the table used for write sniffing; default=hbase:canary");
+ System.err.println(" -writeTableTimeout timeout for writeTable; default=600000ms");
+ System.err.println(" -readTableTimeouts =," +
+ "=,...");
+ System.err.println(" comma-separated list of table read timeouts " +
+ "(no spaces);");
+ System.err.println(" logs 'ERROR' if takes longer. default=600000ms");
+ System.err.println(" -permittedZookeeperFailures Ignore first N failures attempting to ");
+ System.err.println(" connect to individual zookeeper nodes in ensemble");
+ System.err.println("");
+ System.err.println(" -D= to assign or override configuration params");
+ System.err.println(" -Dhbase.canary.read.raw.enabled= Set to enable/disable " +
+ "raw scan; default=false");
+ System.err.println("");
+ System.err.println("Canary runs in one of three modes: region (default), regionserver, or " +
+ "zookeeper.");
+ System.err.println("To sniff/probe all regions, pass no arguments.");
+ System.err.println("To sniff/probe all regions of a table, pass tablename.");
+ System.err.println("To sniff/probe regionservers, pass -regionserver, etc.");
+ System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation.");
+ System.exit(USAGE_EXIT_CODE);
+ }
+
+ Sink getSink(Configuration configuration, Class clazz) {
+ // In test context, this.sink might be set. Use it if non-null. For testing.
+ return this.sink != null? this.sink:
+ (Sink)ReflectionUtils.newInstance(configuration.getClass("hbase.canary.sink.class",
+ clazz, Sink.class));
+ }
+
+ /**
+ * Canary region mode-specific data structure which stores information about each region
+ * to be scanned
+ */
+ public static class RegionTaskResult {
+ private RegionInfo region;
+ private TableName tableName;
+ private ServerName serverName;
+ private AtomicLong readLatency = null;
+ private AtomicLong writeLatency = null;
+ private boolean readSuccess = false;
+ private boolean writeSuccess = false;
+
+ public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName) {
+ this.region = region;
+ this.tableName = tableName;
+ this.serverName = serverName;
+ }
+
+ public RegionInfo getRegionInfo() {
+ return this.region;
+ }
+
+ public String getRegionNameAsString() {
+ return this.region.getRegionNameAsString();
+ }
+
+ public TableName getTableName() {
+ return this.tableName;
+ }
+
+ public String getTableNameAsString() {
+ return this.tableName.getNameAsString();
+ }
+
+ public ServerName getServerName() {
+ return this.serverName;
+ }
+
+ public String getServerNameAsString() {
+ return this.serverName.getServerName();
+ }
+
+ public long getReadLatency() {
+ if (this.readLatency == null) {
+ return -1;
+ }
+ return this.readLatency.get();
+ }
+
+ public void setReadLatency(long readLatency) {
+ if (this.readLatency != null) {
+ this.readLatency.set(readLatency);
+ } else {
+ this.readLatency = new AtomicLong(readLatency);
+ }
+ }
+
+ public long getWriteLatency() {
+ if (this.writeLatency == null) {
+ return -1;
+ }
+ return this.writeLatency.get();
+ }
+
+ public void setWriteLatency(long writeLatency) {
+ if (this.writeLatency != null) {
+ this.writeLatency.set(writeLatency);
+ } else {
+ this.writeLatency = new AtomicLong(writeLatency);
+ }
+ }
+
+ public boolean isReadSuccess() {
+ return this.readSuccess;
+ }
+
+ public void setReadSuccess() {
+ this.readSuccess = true;
+ }
+
+ public boolean isWriteSuccess() {
+ return this.writeSuccess;
+ }
+
+ public void setWriteSuccess() {
+ this.writeSuccess = true;
+ }
+ }
+
+ /**
+ * A Factory method for {@link Monitor}.
+ * Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a RegionMonitor.
+ * @return a Monitor instance
+ */
+ private Monitor newMonitor(final Connection connection, String[] monitorTargets) {
+ Monitor monitor;
+ boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false);
+ boolean regionServerAllRegions
+ = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false);
+ boolean failOnError
+ = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
+ int permittedFailures
+ = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0);
+ boolean writeSniffing
+ = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false);
+ String writeTableName = conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME,
+ DEFAULT_WRITE_TABLE_NAME.getNameAsString());
+ long configuredWriteTableTimeout
+ = conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT);
+
+ if (this.regionServerMode) {
+ monitor =
+ new RegionServerMonitor(connection, monitorTargets, useRegExp,
+ getSink(connection.getConfiguration(), RegionServerStdOutSink.class),
+ this.executor, regionServerAllRegions,
+ failOnError, permittedFailures);
+
+ } else if (this.zookeeperMode) {
+ monitor =
+ new ZookeeperMonitor(connection, monitorTargets, useRegExp,
+ getSink(connection.getConfiguration(), ZookeeperStdOutSink.class),
+ this.executor, failOnError, permittedFailures);
+ } else {
+ monitor =
+ new RegionMonitor(connection, monitorTargets, useRegExp,
+ getSink(connection.getConfiguration(), RegionStdOutSink.class),
+ this.executor, writeSniffing,
+ TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts,
+ configuredWriteTableTimeout, permittedFailures);
+ }
+ return monitor;
+ }
+
+ private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) {
+ String[] tableTimeouts = configuredReadTableTimeoutsStr.split(",");
+ for (String tT : tableTimeouts) {
+ String[] nameTimeout = tT.split("=");
+ if (nameTimeout.length < 2) {
+ throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form " +
+ "= (without spaces).");
+ }
+ long timeoutVal;
+ try {
+ timeoutVal = Long.parseLong(nameTimeout[1]);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("-readTableTimeouts read timeout for each table" +
+ " must be a numeric value argument.");
+ }
+ configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal);
+ }
+ }
+ /**
+ * A Monitor super-class can be extended by users
+ */
+ public static abstract class Monitor implements Runnable, Closeable {
+ protected Connection connection;
+ protected Admin admin;
+ /**
+ * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes.
+ * Passed on the command-line as arguments.
+ */
+ protected String[] targets;
+ protected boolean useRegExp;
+ protected boolean treatFailureAsError;
+ protected boolean initialized = false;
+
+ protected boolean done = false;
+ protected int errorCode = 0;
+ protected long allowedFailures = 0;
+ protected Sink sink;
+ protected ExecutorService executor;
+
+ public boolean isDone() {
+ return done;
+ }
+
+ public boolean hasError() {
+ return errorCode != 0;
+ }
+
+ public boolean finalCheckForErrors() {
+ if (errorCode != 0) {
+ return true;
+ }
+ if (treatFailureAsError &&
+ (sink.getReadFailureCount() > allowedFailures || sink.getWriteFailureCount() > allowedFailures)) {
+ LOG.error("Too many failures detected, treating failure as error, failing the Canary.");
+ errorCode = FAILURE_EXIT_CODE;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.admin != null) this.admin.close();
+ }
+
+ protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
+ ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
+ if (null == connection) throw new IllegalArgumentException("connection shall not be null");
+
+ this.connection = connection;
+ this.targets = monitorTargets;
+ this.useRegExp = useRegExp;
+ this.treatFailureAsError = treatFailureAsError;
+ this.sink = sink;
+ this.executor = executor;
+ this.allowedFailures = allowedFailures;
+ }
+
+ @Override
+ public abstract void run();
+
+ protected boolean initAdmin() {
+ if (null == this.admin) {
+ try {
+ this.admin = this.connection.getAdmin();
+ } catch (Exception e) {
+ LOG.error("Initial HBaseAdmin failed...", e);
+ this.errorCode = INIT_ERROR_EXIT_CODE;
+ }
+ } else if (admin.isAborted()) {
+ LOG.error("HBaseAdmin aborted");
+ this.errorCode = INIT_ERROR_EXIT_CODE;
+ }
+ return !this.hasError();
+ }
+ }
+
+ /**
+ * A monitor for region mode.
+ */
+ private static class RegionMonitor extends Monitor {
+ // 10 minutes
+ private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
+ // 1 days
+ private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
+
+ private long lastCheckTime = -1;
+ private boolean writeSniffing;
+ private TableName writeTableName;
+ private int writeDataTTL;
+ private float regionsLowerLimit;
+ private float regionsUpperLimit;
+ private int checkPeriod;
+ private boolean rawScanEnabled;
+
+ /**
+ * This is a timeout per table. If read of each region in the table aggregated takes longer
+ * than what is configured here, we log an ERROR rather than just an INFO.
+ */
+ private HashMap configuredReadTableTimeouts;
+
+ private long configuredWriteTableTimeout;
+
+ public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
+ Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
+ boolean treatFailureAsError, HashMap configuredReadTableTimeouts,
+ long configuredWriteTableTimeout,
+ long allowedFailures) {
+ super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
+ allowedFailures);
+ Configuration conf = connection.getConfiguration();
+ this.writeSniffing = writeSniffing;
+ this.writeTableName = writeTableName;
+ this.writeDataTTL =
+ conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
+ this.regionsLowerLimit =
+ conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
+ this.regionsUpperLimit =
+ conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
+ this.checkPeriod =
+ conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
+ DEFAULT_WRITE_TABLE_CHECK_PERIOD);
+ this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
+ this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts);
+ this.configuredWriteTableTimeout = configuredWriteTableTimeout;
+ }
+
+ private RegionStdOutSink getSink() {
+ if (!(sink instanceof RegionStdOutSink)) {
+ throw new RuntimeException("Can only write to Region sink");
+ }
+ return ((RegionStdOutSink) sink);
+ }
+
+ @Override
+ public void run() {
+ if (this.initAdmin()) {
+ try {
+ List> taskFutures = new LinkedList<>();
+ RegionStdOutSink regionSink = this.getSink();
+ if (this.targets != null && this.targets.length > 0) {
+ String[] tables = generateMonitorTables(this.targets);
+ // Check to see that each table name passed in the -readTableTimeouts argument is also
+ // passed as a monitor target.
+ if (!new HashSet<>(Arrays.asList(tables)).
+ containsAll(this.configuredReadTableTimeouts.keySet())) {
+ LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " +
+ "passed via command line.");
+ this.errorCode = USAGE_EXIT_CODE;
+ return;
+ }
+ this.initialized = true;
+ for (String table : tables) {
+ LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
+ taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ,
+ this.rawScanEnabled, readLatency));
+ }
+ } else {
+ taskFutures.addAll(sniff(TaskType.READ, regionSink));
+ }
+
+ if (writeSniffing) {
+ if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
+ try {
+ checkWriteTableDistribution();
+ } catch (IOException e) {
+ LOG.error("Check canary table distribution failed!", e);
+ }
+ lastCheckTime = EnvironmentEdgeManager.currentTime();
+ }
+ // sniff canary table with write operation
+ regionSink.initializeWriteLatency();
+ LongAdder writeTableLatency = regionSink.getWriteLatency();
+ taskFutures.addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName),
+ executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency));
+ }
+
+ for (Future future : taskFutures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ LOG.error("Sniff region failed!", e);
+ }
+ }
+ Map actualReadTableLatency = regionSink.getReadLatencyMap();
+ for (Map.Entry entry : configuredReadTableTimeouts.entrySet()) {
+ String tableName = entry.getKey();
+ if (actualReadTableLatency.containsKey(tableName)) {
+ Long actual = actualReadTableLatency.get(tableName).longValue();
+ Long configured = entry.getValue();
+ if (actual > configured) {
+ LOG.error("Read operation for {} took {}ms exceeded the configured read timeout." +
+ "(Configured read timeout {}ms.", tableName, actual, configured);
+ } else {
+ LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.",
+ tableName, actual, configured);
+ }
+ } else {
+ LOG.error("Read operation for {} failed!", tableName);
+ }
+ }
+ if (this.writeSniffing) {
+ String writeTableStringName = this.writeTableName.getNameAsString();
+ long actualWriteLatency = regionSink.getWriteLatency().longValue();
+ LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.",
+ writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout);
+ // Check that the writeTable write operation latency does not exceed the configured timeout.
+ if (actualWriteLatency > this.configuredWriteTableTimeout) {
+ LOG.error("Write operation for {} exceeded the configured write timeout.",
+ writeTableStringName);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Run regionMonitor failed", e);
+ this.errorCode = ERROR_EXIT_CODE;
+ } finally {
+ this.done = true;
+ }
+ }
+ this.done = true;
+ }
+
+ /**
+ * @return List of tables to use in test.
+ */
+ private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
+ String[] returnTables = null;
+
+ if (this.useRegExp) {
+ Pattern pattern = null;
+ List tds = null;
+ Set tmpTables = new TreeSet<>();
+ try {
+ LOG.debug(String.format("reading list of tables"));
+ tds = this.admin.listTableDescriptors(pattern);
+ if (tds == null) {
+ tds = Collections.emptyList();
+ }
+ for (String monitorTarget : monitorTargets) {
+ pattern = Pattern.compile(monitorTarget);
+ for (TableDescriptor td : tds) {
+ if (pattern.matcher(td.getTableName().getNameAsString()).matches()) {
+ tmpTables.add(td.getTableName().getNameAsString());
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Communicate with admin failed", e);
+ throw e;
+ }
+
+ if (tmpTables.size() > 0) {
+ returnTables = tmpTables.toArray(new String[tmpTables.size()]);
+ } else {
+ String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
+ LOG.error(msg);
+ this.errorCode = INIT_ERROR_EXIT_CODE;
+ throw new TableNotFoundException(msg);
+ }
+ } else {
+ returnTables = monitorTargets;
+ }
+
+ return returnTables;
+ }
+
+ /*
+ * Canary entry point to monitor all the tables.
+ */
+ private List> sniff(TaskType taskType, RegionStdOutSink regionSink)
+ throws Exception {
+ LOG.debug("Reading list of tables");
+ List> taskFutures = new LinkedList<>();
+ for (TableDescriptor td: admin.listTableDescriptors()) {
+ if (admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) &&
+ (!td.getTableName().equals(writeTableName))) {
+ LongAdder readLatency =
+ regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString());
+ taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType, this.rawScanEnabled,
+ readLatency));
+ }
+ }
+ return taskFutures;
+ }
+
+ private void checkWriteTableDistribution() throws IOException {
+ if (!admin.tableExists(writeTableName)) {
+ int numberOfServers =
+ admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().size();
+ if (numberOfServers == 0) {
+ throw new IllegalStateException("No live regionservers");
+ }
+ createWriteTable(numberOfServers);
+ }
+
+ if (!admin.isTableEnabled(writeTableName)) {
+ admin.enableTable(writeTableName);
+ }
+
+ ClusterMetrics status =
+ admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER));
+ int numberOfServers = status.getLiveServerMetrics().size();
+ if (status.getLiveServerMetrics().containsKey(status.getMasterName())) {
+ numberOfServers -= 1;
+ }
+
+ List> pairs =
+ MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
+ int numberOfRegions = pairs.size();
+ if (numberOfRegions < numberOfServers * regionsLowerLimit
+ || numberOfRegions > numberOfServers * regionsUpperLimit) {
+ admin.disableTable(writeTableName);
+ admin.deleteTable(writeTableName);
+ createWriteTable(numberOfServers);
+ }
+ HashSet serverSet = new HashSet<>();
+ for (Pair pair : pairs) {
+ serverSet.add(pair.getSecond());
+ }
+ int numberOfCoveredServers = serverSet.size();
+ if (numberOfCoveredServers < numberOfServers) {
+ admin.balance();
+ }
+ }
+
+ private void createWriteTable(int numberOfServers) throws IOException {
+ int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
+ LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " +
+ "(current lower limit of regions per server is {} and you can change it with config {}).",
+ numberOfServers, numberOfRegions, regionsLowerLimit,
+ HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY);
+ HTableDescriptor desc = new HTableDescriptor(writeTableName);
+ HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
+ family.setMaxVersions(1);
+ family.setTimeToLive(writeDataTTL);
+
+ desc.addFamily(family);
+ byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
+ admin.createTable(desc, splits);
+ }
+ }
+
+ /**
+ * Canary entry point for specified table.
+ * @throws Exception
+ */
+ private static List> sniff(final Admin admin, final Sink sink, String tableName,
+ ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency)
+ throws Exception {
+ LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName);
+ if (admin.isTableEnabled(TableName.valueOf(tableName))) {
+ return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)),
+ executor, taskType, rawScanEnabled, readLatency);
+ } else {
+ LOG.warn("Table {} is not enabled", tableName);
+ }
+ return new LinkedList<>();
+ }
+
+ /*
+ * Loops over regions of this table, and outputs information about the state.
+ */
+ private static List> sniff(final Admin admin, final Sink sink,
+ TableDescriptor tableDesc, ExecutorService executor, TaskType taskType,
+ boolean rawScanEnabled, LongAdder rwLatency) throws Exception {
+ LOG.debug("Reading list of regions for table {}", tableDesc.getTableName());
+ try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) {
+ List tasks = new ArrayList<>();
+ try (RegionLocator regionLocator =
+ admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
+ for (HRegionLocation location: regionLocator.getAllRegionLocations()) {
+ ServerName rs = location.getServerName();
+ RegionInfo region = location.getRegion();
+ tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink)sink,
+ taskType, rawScanEnabled, rwLatency));
+ Map regionMap = ((RegionStdOutSink) sink).getRegionMap();
+ regionMap.put(region.getRegionNameAsString(), new RegionTaskResult(region,
+ region.getTable(), rs));
+ }
+ return executor.invokeAll(tasks);
+ }
+ } catch (TableNotFoundException e) {
+ return Collections.EMPTY_LIST;
+ }
+ }
+
+ // monitor for zookeeper mode
+ private static class ZookeeperMonitor extends Monitor {
+ private List hosts;
+ private final String znode;
+ private final int timeout;
+
+ protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
+ Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
+ super(connection, monitorTargets, useRegExp,
+ sink, executor, treatFailureAsError, allowedFailures);
+ Configuration configuration = connection.getConfiguration();
+ znode =
+ configuration.get(ZOOKEEPER_ZNODE_PARENT,
+ DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+ timeout = configuration
+ .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
+ ConnectStringParser parser =
+ new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
+ hosts = Lists.newArrayList();
+ for (InetSocketAddress server : parser.getServerAddresses()) {
+ hosts.add(server.toString());
+ }
+ if (allowedFailures > (hosts.size() - 1) / 2) {
+ LOG.warn("Confirm allowable number of failed ZooKeeper nodes, as quorum will " +
+ "already be lost. Setting of {} failures is unexpected for {} ensemble size.",
+ allowedFailures, hosts.size());
+ }
+ }
+
+ @Override public void run() {
+ List tasks = Lists.newArrayList();
+ ZookeeperStdOutSink zkSink = null;
+ try {
+ zkSink = this.getSink();
+ } catch (RuntimeException e) {
+ LOG.error("Run ZooKeeperMonitor failed!", e);
+ this.errorCode = ERROR_EXIT_CODE;
+ }
+ this.initialized = true;
+ for (final String host : hosts) {
+ tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink));
+ }
+ try {
+ for (Future future : this.executor.invokeAll(tasks)) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ LOG.error("Sniff zookeeper failed!", e);
+ this.errorCode = ERROR_EXIT_CODE;
+ }
+ }
+ } catch (InterruptedException e) {
+ this.errorCode = ERROR_EXIT_CODE;
+ Thread.currentThread().interrupt();
+ LOG.error("Sniff zookeeper interrupted!", e);
+ }
+ this.done = true;
+ }
+
+ private ZookeeperStdOutSink getSink() {
+ if (!(sink instanceof ZookeeperStdOutSink)) {
+ throw new RuntimeException("Can only write to zookeeper sink");
+ }
+ return ((ZookeeperStdOutSink) sink);
+ }
+ }
+
+
+ /**
+ * A monitor for regionserver mode
+ */
+ private static class RegionServerMonitor extends Monitor {
+ private boolean allRegions;
+
+ public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
+ Sink sink, ExecutorService executor, boolean allRegions,
+ boolean treatFailureAsError, long allowedFailures) {
+ super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
+ allowedFailures);
+ this.allRegions = allRegions;
+ }
+
+ private RegionServerStdOutSink getSink() {
+ if (!(sink instanceof RegionServerStdOutSink)) {
+ throw new RuntimeException("Can only write to regionserver sink");
+ }
+ return ((RegionServerStdOutSink) sink);
+ }
+
+ @Override
+ public void run() {
+ if (this.initAdmin() && this.checkNoTableNames()) {
+ RegionServerStdOutSink regionServerSink = null;
+ try {
+ regionServerSink = this.getSink();
+ } catch (RuntimeException e) {
+ LOG.error("Run RegionServerMonitor failed!", e);
+ this.errorCode = ERROR_EXIT_CODE;
+ }
+ Map> rsAndRMap = this.filterRegionServerByName();
+ this.initialized = true;
+ this.monitorRegionServers(rsAndRMap, regionServerSink);
+ }
+ this.done = true;
+ }
+
+ private boolean checkNoTableNames() {
+ List foundTableNames = new ArrayList<>();
+ TableName[] tableNames = null;
+ LOG.debug("Reading list of tables");
+ try {
+ tableNames = this.admin.listTableNames();
+ } catch (IOException e) {
+ LOG.error("Get listTableNames failed", e);
+ this.errorCode = INIT_ERROR_EXIT_CODE;
+ return false;
+ }
+
+ if (this.targets == null || this.targets.length == 0) return true;
+
+ for (String target : this.targets) {
+ for (TableName tableName : tableNames) {
+ if (target.equals(tableName.getNameAsString())) {
+ foundTableNames.add(target);
+ }
+ }
+ }
+
+ if (foundTableNames.size() > 0) {
+ System.err.println("Cannot pass a tablename when using the -regionserver " +
+ "option, tablenames:" + foundTableNames.toString());
+ this.errorCode = USAGE_EXIT_CODE;
+ }
+ return foundTableNames.isEmpty();
+ }
+
+ private void monitorRegionServers(Map> rsAndRMap, RegionServerStdOutSink regionServerSink) {
+ List tasks = new ArrayList<>();
+ Map successMap = new HashMap<>();
+ Random rand = new Random();
+ for (Map.Entry> entry : rsAndRMap.entrySet()) {
+ String serverName = entry.getKey();
+ AtomicLong successes = new AtomicLong(0);
+ successMap.put(serverName, successes);
+ if (entry.getValue().isEmpty()) {
+ LOG.error("Regionserver not serving any regions - {}", serverName);
+ } else if (this.allRegions) {
+ for (RegionInfo region : entry.getValue()) {
+ tasks.add(new RegionServerTask(this.connection,
+ serverName,
+ region,
+ regionServerSink,
+ successes));
+ }
+ } else {
+ // random select a region if flag not set
+ RegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
+ tasks.add(new RegionServerTask(this.connection,
+ serverName,
+ region,
+ regionServerSink,
+ successes));
+ }
+ }
+ try {
+ for (Future future : this.executor.invokeAll(tasks)) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ LOG.error("Sniff regionserver failed!", e);
+ this.errorCode = ERROR_EXIT_CODE;
+ }
+ }
+ if (this.allRegions) {
+ for (Map.Entry> entry : rsAndRMap.entrySet()) {
+ String serverName = entry.getKey();
+ LOG.info("Successfully read {} regions out of {} on regionserver {}",
+ successMap.get(serverName), entry.getValue().size(), serverName);
+ }
+ }
+ } catch (InterruptedException e) {
+ this.errorCode = ERROR_EXIT_CODE;
+ LOG.error("Sniff regionserver interrupted!", e);
+ }
+ }
+
+ private Map> filterRegionServerByName() {
+ Map> regionServerAndRegionsMap = this.getAllRegionServerByName();
+ regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
+ return regionServerAndRegionsMap;
+ }
+
+ private Map> getAllRegionServerByName() {
+ Map> rsAndRMap = new HashMap<>();
+ try {
+ LOG.debug("Reading list of tables and locations");
+ List tableDescs = this.admin.listTableDescriptors();
+ List regions = null;
+ for (TableDescriptor tableDesc: tableDescs) {
+ try (RegionLocator regionLocator =
+ this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
+ for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
+ ServerName rs = location.getServerName();
+ String rsName = rs.getHostname();
+ RegionInfo r = location.getRegion();
+ if (rsAndRMap.containsKey(rsName)) {
+ regions = rsAndRMap.get(rsName);
+ } else {
+ regions = new ArrayList<>();
+ rsAndRMap.put(rsName, regions);
+ }
+ regions.add(r);
+ }
+ }
+ }
+
+ // get any live regionservers not serving any regions
+ for (ServerName rs: this.admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
+ .getLiveServerMetrics().keySet()) {
+ String rsName = rs.getHostname();
+ if (!rsAndRMap.containsKey(rsName)) {
+ rsAndRMap.put(rsName, Collections. emptyList());
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Get HTables info failed", e);
+ this.errorCode = INIT_ERROR_EXIT_CODE;
+ }
+ return rsAndRMap;
+ }
+
+ private Map> doFilterRegionServerByName(
+ Map> fullRsAndRMap) {
+
+ Map> filteredRsAndRMap = null;
+
+ if (this.targets != null && this.targets.length > 0) {
+ filteredRsAndRMap = new HashMap<>();
+ Pattern pattern = null;
+ Matcher matcher = null;
+ boolean regExpFound = false;
+ for (String rsName : this.targets) {
+ if (this.useRegExp) {
+ regExpFound = false;
+ pattern = Pattern.compile(rsName);
+ for (Map.Entry> entry : fullRsAndRMap.entrySet()) {
+ matcher = pattern.matcher(entry.getKey());
+ if (matcher.matches()) {
+ filteredRsAndRMap.put(entry.getKey(), entry.getValue());
+ regExpFound = true;
+ }
+ }
+ if (!regExpFound) {
+ LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName);
+ }
+ } else {
+ if (fullRsAndRMap.containsKey(rsName)) {
+ filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
+ } else {
+ LOG.info("No RegionServerInfo found, regionServerName {}", rsName);
+ }
+ }
+ }
+ } else {
+ filteredRsAndRMap = fullRsAndRMap;
+ }
+ return filteredRsAndRMap;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ final Configuration conf = HBaseConfiguration.create();
+
+ int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
+ LOG.info("Execution thread count={}", numThreads);
+
+ int exitCode;
+ ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
+ try {
+ exitCode = ToolRunner.run(conf, new CanaryTool(executor), args);
+ } finally {
+ executor.shutdown();
+ }
+ System.exit(exitCode);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java
index 79d526192038..dbbdf616edd8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java
@@ -53,7 +53,6 @@
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -120,8 +119,8 @@ public void testBasicCanaryWorks() throws Exception {
table.put(p);
}
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
- Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink());
- Canary canary = new Canary(executor, sink);
+ CanaryTool.RegionStdOutSink sink = spy(new CanaryTool.RegionStdOutSink());
+ CanaryTool canary = new CanaryTool(executor, sink);
String[] args = { "-writeSniffing", "-t", "10000", tableName.getNameAsString() };
assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args));
assertEquals("verify no read error count", 0, canary.getReadFailures().size());
@@ -142,8 +141,8 @@ public void testCanaryRegionTaskResult() throws Exception {
table.put(p);
}
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
- Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink());
- Canary canary = new Canary(executor, sink);
+ CanaryTool.RegionStdOutSink sink = spy(new CanaryTool.RegionStdOutSink());
+ CanaryTool canary = new CanaryTool(executor, sink);
String[] args = { "-writeSniffing", "-t", "10000", "testCanaryRegionTaskResult" };
assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args));
@@ -156,11 +155,11 @@ public void testCanaryRegionTaskResult() throws Exception {
assertTrue("canary should expect to scan at least 1 region",
sink.getTotalExpectedRegions() > 0);
- Map regionMap = sink.getRegionMap();
+ Map regionMap = sink.getRegionMap();
assertFalse("verify region map has size > 0", regionMap.isEmpty());
for (String regionName : regionMap.keySet()) {
- Canary.RegionTaskResult res = regionMap.get(regionName);
+ CanaryTool.RegionTaskResult res = regionMap.get(regionName);
assertNotNull("verify each expected region has a RegionTaskResult object in the map", res);
assertNotNull("verify getRegionNameAsString()", regionName);
assertNotNull("verify getRegionInfo()", res.getRegionInfo());
@@ -169,7 +168,7 @@ public void testCanaryRegionTaskResult() throws Exception {
assertNotNull("verify getServerName()", res.getServerName());
assertNotNull("verify getServerNameAsString()", res.getServerNameAsString());
- if (regionName.contains(Canary.DEFAULT_WRITE_TABLE_NAME.getNameAsString())) {
+ if (regionName.contains(CanaryTool.DEFAULT_WRITE_TABLE_NAME.getNameAsString())) {
assertTrue("write to region " + regionName + " succeeded", res.isWriteSuccess());
assertTrue("write took some time", res.getWriteLatency() > -1);
} else {
@@ -180,7 +179,6 @@ public void testCanaryRegionTaskResult() throws Exception {
}
@Test
- @Ignore("Intermittent argument matching failures, see HBASE-18813")
public void testReadTableTimeouts() throws Exception {
final TableName [] tableNames = new TableName[2];
tableNames[0] = TableName.valueOf(name.getMethodName() + "1");
@@ -197,8 +195,8 @@ public void testReadTableTimeouts() throws Exception {
}
}
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
- Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink());
- Canary canary = new Canary(executor, sink);
+ CanaryTool.RegionStdOutSink sink = spy(new CanaryTool.RegionStdOutSink());
+ CanaryTool canary = new CanaryTool(executor, sink);
String configuredTimeoutStr = tableNames[0].getNameAsString() + "=" + Long.MAX_VALUE + "," +
tableNames[1].getNameAsString() + "=0";
String[] args = {"-readTableTimeouts", configuredTimeoutStr, name.getMethodName() + "1",
@@ -219,17 +217,16 @@ public boolean matches(LoggingEvent argument) {
verify(mockAppender, times(2)).doAppend(argThat(new ArgumentMatcher() {
@Override
public boolean matches(LoggingEvent argument) {
- return argument.getRenderedMessage().contains("The configured read timeout was");
+ return argument.getRenderedMessage().contains("Configured read timeout");
}
}));
}
@Test
- @Ignore("Intermittent argument matching failures, see HBASE-18813")
public void testWriteTableTimeout() throws Exception {
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
- Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink());
- Canary canary = new Canary(executor, sink);
+ CanaryTool.RegionStdOutSink sink = spy(new CanaryTool.RegionStdOutSink());
+ CanaryTool canary = new CanaryTool(executor, sink);
String[] args = { "-writeSniffing", "-writeTableTimeout", String.valueOf(Long.MAX_VALUE)};
assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args));
assertNotEquals("verify non-null write latency", null, sink.getWriteLatency());
@@ -238,7 +235,7 @@ public void testWriteTableTimeout() throws Exception {
new ArgumentMatcher() {
@Override
public boolean matches(LoggingEvent argument) {
- return argument.getRenderedMessage().contains("The configured write timeout was");
+ return argument.getRenderedMessage().contains("Configured write timeout");
}
}));
}
@@ -281,8 +278,8 @@ public void testRawScanConfig() throws Exception {
table.put(p);
}
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
- Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink());
- Canary canary = new Canary(executor, sink);
+ CanaryTool.RegionStdOutSink sink = spy(new CanaryTool.RegionStdOutSink());
+ CanaryTool canary = new CanaryTool(executor, sink);
String[] args = { "-t", "10000", name.getMethodName() };
org.apache.hadoop.conf.Configuration conf =
new org.apache.hadoop.conf.Configuration(testingUtility.getConfiguration());
@@ -296,7 +293,7 @@ public void testRawScanConfig() throws Exception {
private void runRegionserverCanary() throws Exception {
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
- Canary canary = new Canary(executor, new Canary.RegionServerStdOutSink());
+ CanaryTool canary = new CanaryTool(executor, new CanaryTool.RegionServerStdOutSink());
String[] args = { "-t", "10000", "-regionserver"};
assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args));
assertEquals("verify no read error count", 0, canary.getReadFailures().size());
@@ -307,8 +304,8 @@ private void testZookeeperCanaryWithArgs(String[] args) throws Exception {
Iterables.getOnlyElement(testingUtility.getZkCluster().getClientPortList(), null);
testingUtility.getConfiguration().set(HConstants.ZOOKEEPER_QUORUM, "localhost:" + port);
ExecutorService executor = new ScheduledThreadPoolExecutor(2);
- Canary.ZookeeperStdOutSink sink = spy(new Canary.ZookeeperStdOutSink());
- Canary canary = new Canary(executor, sink);
+ CanaryTool.ZookeeperStdOutSink sink = spy(new CanaryTool.ZookeeperStdOutSink());
+ CanaryTool canary = new CanaryTool(executor, sink);
assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args));
String baseZnode = testingUtility.getConfiguration()