-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-28819 CanaryTool: Create canary tables in different RSgroup and… #6235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
wangxin97-02
wants to merge
1
commit into
apache:master
Choose a base branch
from
wangxin97-02:HBASE-28819-2
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+119
−87
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,7 +28,6 @@ | |
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Collections; | ||
| import java.util.EnumSet; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.LinkedList; | ||
|
|
@@ -48,18 +47,17 @@ | |
| import java.util.concurrent.atomic.LongAdder; | ||
| import java.util.regex.Matcher; | ||
| import java.util.regex.Pattern; | ||
| import org.apache.commons.lang3.StringUtils; | ||
| import org.apache.commons.lang3.time.StopWatch; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.hbase.AuthUtil; | ||
| import org.apache.hadoop.hbase.ChoreService; | ||
| import org.apache.hadoop.hbase.ClusterMetrics; | ||
| import org.apache.hadoop.hbase.ClusterMetrics.Option; | ||
| import org.apache.hadoop.hbase.DoNotRetryIOException; | ||
| import org.apache.hadoop.hbase.HBaseConfiguration; | ||
| import org.apache.hadoop.hbase.HBaseInterfaceAudience; | ||
| import org.apache.hadoop.hbase.HConstants; | ||
| import org.apache.hadoop.hbase.HRegionLocation; | ||
| import org.apache.hadoop.hbase.MetaTableAccessor; | ||
| import org.apache.hadoop.hbase.NamespaceDescriptor; | ||
| import org.apache.hadoop.hbase.ScheduledChore; | ||
| import org.apache.hadoop.hbase.ServerName; | ||
|
|
@@ -75,17 +73,19 @@ | |
| import org.apache.hadoop.hbase.client.Put; | ||
| import org.apache.hadoop.hbase.client.RegionInfo; | ||
| import org.apache.hadoop.hbase.client.RegionLocator; | ||
| import org.apache.hadoop.hbase.client.RegionStatesCount; | ||
| import org.apache.hadoop.hbase.client.ResultScanner; | ||
| import org.apache.hadoop.hbase.client.Scan; | ||
| import org.apache.hadoop.hbase.client.Table; | ||
| import org.apache.hadoop.hbase.client.TableDescriptor; | ||
| import org.apache.hadoop.hbase.client.TableDescriptorBuilder; | ||
| import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; | ||
| import org.apache.hadoop.hbase.http.InfoServer; | ||
| import org.apache.hadoop.hbase.net.Address; | ||
| import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; | ||
| import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType; | ||
| import org.apache.hadoop.hbase.util.Bytes; | ||
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; | ||
| import org.apache.hadoop.hbase.util.Pair; | ||
| import org.apache.hadoop.hbase.util.ReflectionUtils; | ||
| import org.apache.hadoop.hbase.util.RegionSplitter; | ||
| import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; | ||
|
|
@@ -303,6 +303,7 @@ public void publishReadTiming(String znode, String server, long msTime) { | |
| */ | ||
| public static class RegionStdOutSink extends StdOutSink { | ||
| private Map<String, LongAdder> perTableReadLatency = new HashMap<>(); | ||
| private Map<String, LongAdder> perTableWriteLatency = new HashMap<>(); | ||
| private LongAdder writeLatency = new LongAdder(); | ||
| private final ConcurrentMap<String, List<RegionTaskResult>> regionMap = | ||
| new ConcurrentHashMap<>(); | ||
|
|
@@ -400,12 +401,22 @@ public Map<String, LongAdder> getReadLatencyMap() { | |
| return this.perTableReadLatency; | ||
| } | ||
|
|
||
| public Map<String, LongAdder> getWriteLatencyMap() { | ||
| return this.perTableWriteLatency; | ||
| } | ||
|
|
||
| public LongAdder initializeAndGetReadLatencyForTable(String tableName) { | ||
| LongAdder initLatency = new LongAdder(); | ||
| this.perTableReadLatency.put(tableName, initLatency); | ||
| return initLatency; | ||
| } | ||
|
|
||
| public LongAdder initializeAndGetWriteLatencyForTable(String tableName) { | ||
| LongAdder initLatency = new LongAdder(); | ||
| this.perTableWriteLatency.put(tableName, initLatency); | ||
| return initLatency; | ||
| } | ||
|
|
||
| public void initializeWriteLatency() { | ||
| this.writeLatency.reset(); | ||
| } | ||
|
|
@@ -777,8 +788,10 @@ public Void call() { | |
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(Canary.class); | ||
|
|
||
| public static final TableName DEFAULT_WRITE_TABLE_NAME = | ||
| TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary"); | ||
| public static final String DEFAULT_WRITE_TABLE_NAME_PREFIX = "Canary"; | ||
| public static final String DEFAULT_WRITE_TABLE_NAME_WITH_NAMESPACE_PREFIX = | ||
| NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" | ||
| + CanaryTool.DEFAULT_WRITE_TABLE_NAME_PREFIX; | ||
|
|
||
| private static final String CANARY_TABLE_FAMILY_NAME = "Test"; | ||
|
|
||
|
|
@@ -810,8 +823,6 @@ public Void call() { | |
| "hbase.canary.region.write.sniffing"; | ||
| public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT = | ||
| "hbase.canary.region.write.table.timeout"; | ||
| public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME = | ||
| "hbase.canary.region.write.table.name"; | ||
| public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT = | ||
| "hbase.canary.region.read.table.timeout"; | ||
|
|
||
|
|
@@ -938,14 +949,6 @@ private int parseArgs(String[] args) { | |
| printUsageAndExit(); | ||
| } | ||
| conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout); | ||
| } else if (cmd.equals("-writeTable")) { | ||
| i++; | ||
|
|
||
| if (i == args.length) { | ||
| System.err.println("-writeTable takes a string tablename value argument."); | ||
| printUsageAndExit(); | ||
| } | ||
| conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]); | ||
| } else if (cmd.equals("-f")) { | ||
| i++; | ||
| if (i == args.length) { | ||
|
|
@@ -1131,8 +1134,8 @@ private void printUsageAndExit() { | |
| System.err.println(" -f <B> exit on first error; default=true"); | ||
| System.err.println(" -failureAsError treat read/write failure as error"); | ||
| System.err.println(" -t <N> timeout for canary-test run; default=600000ms"); | ||
| System.err.println(" -writeSniffing enable write sniffing"); | ||
| System.err.println(" -writeTable the table used for write sniffing; default=hbase:canary"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not think we should remove this flag? People may want to use this flag to specify a different write table for canary |
||
| System.err.println(" -writeSniffing enable write sniffing, each rsgroup will create a default" | ||
| + " table, which tablename is hbase:Canary_rsgroupname"); | ||
| System.err.println(" -writeTableTimeout <N> timeout for writeTable; default=600000ms"); | ||
| System.err.println( | ||
| " -readTableTimeouts <tableName>=<read timeout>," + "<tableName>=<read timeout>,..."); | ||
|
|
@@ -1278,8 +1281,6 @@ private Monitor newMonitor(final Connection connection, String[] monitorTargets) | |
| boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); | ||
| int permittedFailures = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0); | ||
| boolean writeSniffing = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false); | ||
| String writeTableName = | ||
| conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME, DEFAULT_WRITE_TABLE_NAME.getNameAsString()); | ||
| long configuredWriteTableTimeout = | ||
| conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT); | ||
|
|
||
|
|
@@ -1295,8 +1296,8 @@ private Monitor newMonitor(final Connection connection, String[] monitorTargets) | |
| } else { | ||
| monitor = new RegionMonitor(connection, monitorTargets, useRegExp, | ||
| getSink(connection.getConfiguration(), RegionStdOutSink.class), this.executor, | ||
| writeSniffing, TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts, | ||
| configuredWriteTableTimeout, permittedFailures); | ||
| writeSniffing, failOnError, configuredReadTableTimeouts, configuredWriteTableTimeout, | ||
| permittedFailures); | ||
| } | ||
| return monitor; | ||
| } | ||
|
|
@@ -1416,7 +1417,6 @@ private static class RegionMonitor extends Monitor { | |
|
|
||
| private long lastCheckTime = -1; | ||
| private boolean writeSniffing; | ||
| private TableName writeTableName; | ||
| private int writeDataTTL; | ||
| private float regionsLowerLimit; | ||
| private float regionsUpperLimit; | ||
|
|
@@ -1433,14 +1433,13 @@ private static class RegionMonitor extends Monitor { | |
| private long configuredWriteTableTimeout; | ||
|
|
||
| public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, | ||
| Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName, | ||
| boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts, | ||
| long configuredWriteTableTimeout, long allowedFailures) { | ||
| Sink sink, ExecutorService executor, boolean writeSniffing, boolean treatFailureAsError, | ||
| HashMap<String, Long> configuredReadTableTimeouts, long configuredWriteTableTimeout, | ||
| long allowedFailures) { | ||
| super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, | ||
| allowedFailures); | ||
| Configuration conf = connection.getConfiguration(); | ||
| this.writeSniffing = writeSniffing; | ||
| this.writeTableName = writeTableName; | ||
| this.writeDataTTL = | ||
| conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL); | ||
| this.regionsLowerLimit = | ||
|
|
@@ -1493,20 +1492,22 @@ public void run() { | |
| } | ||
|
|
||
| if (writeSniffing) { | ||
| Set<TableName> canaryTableList = null; | ||
| if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) { | ||
| try { | ||
| checkWriteTableDistribution(); | ||
| canaryTableList = checkWriteTableDistribution(); | ||
| } catch (IOException e) { | ||
| LOG.error("Check canary table distribution failed!", e); | ||
| } | ||
| lastCheckTime = EnvironmentEdgeManager.currentTime(); | ||
| } | ||
| // sniff canary table with write operation | ||
| regionSink.initializeWriteLatency(); | ||
| LongAdder writeTableLatency = regionSink.getWriteLatency(); | ||
| taskFutures | ||
| .addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName), | ||
| executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency, readAllCF)); | ||
| for (TableName tableName : canaryTableList) { | ||
| // sniff canary table with write operation | ||
| LongAdder writeLatency = regionSink | ||
| .initializeAndGetWriteLatencyForTable(tableName.getNameWithNamespaceInclAsString()); | ||
| taskFutures.addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(tableName), | ||
| executor, TaskType.WRITE, this.rawScanEnabled, writeLatency, readAllCF)); | ||
| } | ||
| } | ||
|
|
||
| for (Future<Void> future : taskFutures) { | ||
|
|
@@ -1534,15 +1535,19 @@ public void run() { | |
| } | ||
| } | ||
| if (this.writeSniffing) { | ||
| String writeTableStringName = this.writeTableName.getNameAsString(); | ||
| long actualWriteLatency = regionSink.getWriteLatency().longValue(); | ||
| LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.", | ||
| writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout); | ||
| // Check that the writeTable write operation latency does not exceed the configured | ||
| // timeout. | ||
| if (actualWriteLatency > this.configuredWriteTableTimeout) { | ||
| LOG.error("Write operation for {} exceeded the configured write timeout.", | ||
| writeTableStringName); | ||
| Map<String, LongAdder> actualWriteTableLatency = regionSink.getWriteLatencyMap(); | ||
| for (Map.Entry<String, LongAdder> entry : actualWriteTableLatency.entrySet()) { | ||
| String tableName = entry.getKey(); | ||
| long actual = entry.getValue().longValue(); | ||
| if (actual > configuredWriteTableTimeout) { | ||
| LOG.error( | ||
| "Write operation for {} took {}ms exceeded the configured write timeout." | ||
| + "(Configured write timeout {}ms.", | ||
| tableName, actual, configuredWriteTableTimeout); | ||
| } else { | ||
| LOG.info("Write operation for {} took {}ms (Configured write timeout {}ms.", | ||
| tableName, actual, configuredWriteTableTimeout); | ||
| } | ||
| } | ||
| } | ||
| } catch (Exception e) { | ||
|
|
@@ -1607,7 +1612,8 @@ private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink) | |
| for (TableDescriptor td : admin.listTableDescriptors()) { | ||
| if ( | ||
| admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) | ||
| && (!td.getTableName().equals(writeTableName)) | ||
| && (!td.getTableName().getNameWithNamespaceInclAsString() | ||
| .startsWith(DEFAULT_WRITE_TABLE_NAME_WITH_NAMESPACE_PREFIX)) | ||
| ) { | ||
| LongAdder readLatency = | ||
| regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString()); | ||
|
|
@@ -1618,60 +1624,82 @@ private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink) | |
| return taskFutures; | ||
| } | ||
|
|
||
| private void checkWriteTableDistribution() throws IOException { | ||
| if (!admin.tableExists(writeTableName)) { | ||
| int numberOfServers = admin.getRegionServers().size(); | ||
| if (numberOfServers == 0) { | ||
| throw new IllegalStateException("No live regionservers"); | ||
| private Set<TableName> checkWriteTableDistribution() throws IOException { | ||
| Set<TableName> canaryTableList = new HashSet<>(); | ||
| ClusterMetrics clusterMetrics = admin.getClusterMetrics(); | ||
| Map<String, Integer> groupNameServerNumsMap = new HashMap<>(); | ||
| for (ServerName serverName : clusterMetrics.getServersName()) { | ||
| RSGroupInfo rsGroup = | ||
| admin.getRSGroup(Address.fromParts(serverName.getHostname(), serverName.getPort())); | ||
| if (rsGroup == null) { | ||
| continue; | ||
| } | ||
| String groupName = rsGroup.getName(); | ||
| Integer count = groupNameServerNumsMap.get(groupName); | ||
| if (count == null) { | ||
| groupNameServerNumsMap.put(groupName, 1); | ||
| } else { | ||
| groupNameServerNumsMap.put(groupName, ++count); | ||
| } | ||
| createWriteTable(numberOfServers); | ||
| } | ||
|
|
||
| if (!admin.isTableEnabled(writeTableName)) { | ||
| admin.enableTable(writeTableName); | ||
| } | ||
|
|
||
| ClusterMetrics status = | ||
| admin.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER)); | ||
| int numberOfServers = status.getServersName().size(); | ||
| if (status.getServersName().contains(status.getMasterName())) { | ||
| numberOfServers -= 1; | ||
| if (groupNameServerNumsMap.isEmpty()) { | ||
| throw new IllegalStateException("No live regionservers"); | ||
| } | ||
|
|
||
| List<Pair<RegionInfo, ServerName>> pairs = | ||
| MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName); | ||
| int numberOfRegions = pairs.size(); | ||
| if ( | ||
| numberOfRegions < numberOfServers * regionsLowerLimit | ||
| || numberOfRegions > numberOfServers * regionsUpperLimit | ||
| ) { | ||
| admin.disableTable(writeTableName); | ||
| admin.deleteTable(writeTableName); | ||
| createWriteTable(numberOfServers); | ||
| } | ||
| HashSet<ServerName> serverSet = new HashSet<>(); | ||
| for (Pair<RegionInfo, ServerName> pair : pairs) { | ||
| serverSet.add(pair.getSecond()); | ||
| } | ||
| int numberOfCoveredServers = serverSet.size(); | ||
| if (numberOfCoveredServers < numberOfServers) { | ||
| admin.balance(); | ||
| Map<TableName, RegionStatesCount> tableStates = clusterMetrics.getTableRegionStatesCount(); | ||
| for (Map.Entry<String, Integer> entry : groupNameServerNumsMap.entrySet()) { | ||
| String group = entry.getKey(); | ||
| int numberOfServers = entry.getValue(); | ||
| String tableName = | ||
| DEFAULT_WRITE_TABLE_NAME_PREFIX + (StringUtils.isBlank(group) ? "" : ("_" + group)); | ||
| TableName canaryTable = | ||
| TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, tableName); | ||
| RegionStatesCount states = tableStates.get(canaryTable); | ||
| int regions = 0; | ||
| if (states == null) { | ||
| createWriteTable(canaryTable, group, numberOfServers); | ||
| } else { | ||
| if (states.getTotalRegions() <= 0) { | ||
| admin.enableTable(canaryTable); | ||
| regions = admin.getRegions(canaryTable).size(); | ||
| } else { | ||
| regions = states.getTotalRegions(); | ||
| } | ||
| if ( | ||
| regions < numberOfServers * regionsLowerLimit | ||
| || regions > numberOfServers * regionsUpperLimit | ||
| ) { | ||
| admin.disableTable(canaryTable); | ||
| admin.deleteTable(canaryTable); | ||
| createWriteTable(canaryTable, group, numberOfServers); | ||
| } | ||
| } | ||
| canaryTableList.add(canaryTable); | ||
| } | ||
| return canaryTableList; | ||
| } | ||
|
|
||
| private void createWriteTable(int numberOfServers) throws IOException { | ||
| private void createWriteTable(TableName tableName, String group, int numberOfServers) | ||
| throws IOException { | ||
| int numberOfRegions = (int) (numberOfServers * regionsLowerLimit); | ||
| LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " | ||
| + "(current lower limit of regions per server is {} and you can change it with config {}).", | ||
| LOG.info( | ||
| "Number of live regionservers {} " | ||
| + (StringUtils.isBlank(group) ? "" : (" of group " + group)) | ||
| + ", pre-splitting the canary table into {} regions (current" | ||
| + " lower limit of regions per server is {} and you can change it with config {}).", | ||
| numberOfServers, numberOfRegions, regionsLowerLimit, | ||
| HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY); | ||
| ColumnFamilyDescriptor family = | ||
| ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CANARY_TABLE_FAMILY_NAME)) | ||
| .setMaxVersions(1).setTimeToLive(writeDataTTL).build(); | ||
| TableDescriptor desc = | ||
| TableDescriptorBuilder.newBuilder(writeTableName).setColumnFamily(family).build(); | ||
| TableDescriptorBuilder tableDescriptorBuilder = | ||
| TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(family); | ||
| if (StringUtils.isNotBlank(group) && !group.equals("default")) { | ||
| tableDescriptorBuilder.setRegionServerGroup(group); | ||
| } | ||
| byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions); | ||
| admin.createTable(desc, splits); | ||
| admin.createTable(tableDescriptorBuilder.build(), splits); | ||
| } | ||
| } | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need to global writeLatency below?