diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java index 5ea6144d0376..e6db8f430173 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java @@ -21,12 +21,10 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; @@ -48,8 +46,6 @@ public class SlowLogTableAccessor { private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class); - private static Connection connection; - /** * hbase:slowlog table name - can be enabled with config - * hbase.regionserver.slowlog.systable.enabled @@ -66,10 +62,10 @@ private static void doPut(final Connection connection, final List puts) thr /** * Add slow/large log records to hbase:slowlog table * @param slowLogPayloads List of SlowLogPayload to process - * @param configuration Configuration to use for connection + * @param connection connection */ public static void addSlowLogRecords(final List slowLogPayloads, - final Configuration configuration) { + Connection connection) { List puts = new ArrayList<>(slowLogPayloads.size()); for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) { final byte[] rowKey = getRowKey(slowLogPayload); @@ -102,26 +98,12 @@ public static void addSlowLogRecords(final List slowL puts.add(put); } try { - if (connection == null) { - createConnection(configuration); - } doPut(connection, puts); } catch (Exception e) { LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e); } } - private static synchronized void createConnection(Configuration configuration) - throws IOException { - Configuration conf = new Configuration(configuration); - // rpc timeout: 20s - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 20000); - // retry count: 5 - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); - conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); - connection = ConnectionFactory.createConnection(conf); - } - /** * Create rowKey: currentTime APPEND slowLogPayload.hashcode Scan on slowlog table should keep * records with sorted order of time, however records added at the very same time could be in @@ -140,5 +122,4 @@ private static byte[] getRowKey(final TooSlowLog.SlowLogPayload slowLogPayload) final long rowKeyLong = Long.parseLong(timeAndHashcode); return Bytes.toBytes(rowKeyLong); } - } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index f4d43a2da291..0ab1bab31a15 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1552,6 +1552,14 @@ public enum OperationStatusCode { "hbase.regionserver.slowlog.systable.enabled"; public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false; + @Deprecated + // since and will be removed in + // Instead use hbase.regionserver.named.queue.chore.duration config property + public static final String SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY = + "hbase.slowlog.systable.chore.duration"; + // Default 10 mins. + public static final int DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION = 10 * 60 * 1000; + public static final String SHELL_TIMESTAMP_FORMAT_EPOCH_KEY = "hbase.shell.timestamp.format.epoch"; @@ -1567,6 +1575,22 @@ public enum OperationStatusCode { */ public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; + public static final String WAL_EVENT_TRACKER_ENABLED_KEY = + "hbase.regionserver.wal.event.tracker.enabled"; + public static final boolean WAL_EVENT_TRACKER_ENABLED_DEFAULT = false; + + public static final String NAMED_QUEUE_CHORE_DURATION_KEY = + "hbase.regionserver.named.queue.chore.duration"; + // 10 mins default. + public static final int NAMED_QUEUE_CHORE_DURATION_DEFAULT = 10 * 60 * 1000; + + /** The walEventTracker info family as a string */ + private static final String WAL_EVENT_TRACKER_INFO_FAMILY_STR = "info"; + + /** The walEventTracker info family in array of bytes */ + public static final byte[] WAL_EVENT_TRACKER_INFO_FAMILY = + Bytes.toBytes(WAL_EVENT_TRACKER_INFO_FAMILY_STR); + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index dc94a6d3e683..ad9a820f83dd 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -2022,7 +2022,7 @@ possible configurations would overwhelm and obscure the important. hbase.namedqueue.provider.classes - org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService + org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService,org.apache.hadoop.hbase.namequeues.WALEventTrackerQueueService Default values for NamedQueueService implementors. This comma separated full class names represent all implementors of NamedQueueService that we would like to be invoked by diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSource.java new file mode 100644 index 000000000000..8bd95aefe8e1 --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSource.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import org.apache.hadoop.hbase.metrics.BaseSource; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public interface MetricsWALEventTrackerSource extends BaseSource { + /** + * The name of the metrics + */ + String METRICS_NAME = "WALEventTracker"; + + /** + * The name of the metrics context that metrics will be under. + */ + String METRICS_CONTEXT = "regionserver"; + + /** + * Description + */ + String METRICS_DESCRIPTION = "Metrics about HBase RegionServer WALEventTracker"; + + /** + * The name of the metrics context that metrics will be under in jmx + */ + String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME; + + String NUM_FAILED_PUTS = "numFailedPuts"; + String NUM_FAILED_PUTS_DESC = "Number of put requests that failed"; + + String NUM_RECORDS_FAILED_PUTS = "numRecordsFailedPuts"; + String NUM_RECORDS_FAILED_PUTS_DESC = "number of records in failed puts"; + + /* + * Increment 2 counters, numFailedPuts and numRecordsFailedPuts + */ + void incrFailedPuts(long numRecords); + + /* + * Get the failed puts counter. + */ + long getFailedPuts(); + + /* + * Get the number of records in failed puts. + */ + long getNumRecordsFailedPuts(); +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSourceImpl.java new file mode 100644 index 000000000000..0ae5b12c4d6a --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSourceImpl.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import org.apache.hadoop.hbase.metrics.BaseSourceImpl; +import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class MetricsWALEventTrackerSourceImpl extends BaseSourceImpl + implements MetricsWALEventTrackerSource { + + private final MutableFastCounter numFailedPutsCount; + private final MutableFastCounter numRecordsFailedPutsCount; + + public MetricsWALEventTrackerSourceImpl() { + this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); + } + + public MetricsWALEventTrackerSourceImpl(String metricsName, String metricsDescription, + String metricsContext, String metricsJmxContext) { + super(metricsName, metricsDescription, metricsContext, metricsJmxContext); + numFailedPutsCount = + this.getMetricsRegistry().newCounter(NUM_FAILED_PUTS, NUM_FAILED_PUTS_DESC, 0L); + numRecordsFailedPutsCount = this.getMetricsRegistry().newCounter(NUM_RECORDS_FAILED_PUTS, + NUM_RECORDS_FAILED_PUTS_DESC, 0L); + } + + @Override + public void incrFailedPuts(long numRecords) { + numFailedPutsCount.incr(); + numRecordsFailedPutsCount.incr(numRecords); + } + + @Override + public long getFailedPuts() { + return numFailedPutsCount.value(); + } + + @Override + public long getNumRecordsFailedPuts() { + return numRecordsFailedPutsCount.value(); + } +} diff --git a/hbase-hadoop-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSource b/hbase-hadoop-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSource new file mode 100644 index 000000000000..5870bf1a9cf6 --- /dev/null +++ b/hbase-hadoop-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSource @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSourceImpl diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index e4f2391771ae..df98f8f81e9c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator; import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer; import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer; import org.apache.hadoop.hbase.mob.MobFileCleanerChore; @@ -1228,6 +1229,8 @@ private void finishActiveMasterInitialization(MonitoredTask status) final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this); slowLogMasterService.init(); + WALEventTrackerTableCreator.createIfNeededAndNotExists(conf, this); + // clear the dead servers with same host name and port of online server because we are not // removing dead server with same hostname and port of rs which is trying to check in before // master initialization. See HBASE-5916. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/waleventtracker/WALEventTrackerTableCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/waleventtracker/WALEventTrackerTableCreator.java new file mode 100644 index 000000000000..a82e58660607 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/waleventtracker/WALEventTrackerTableCreator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.waleventtracker; + +import static org.apache.hadoop.hbase.HConstants.NO_NONCE; +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME_STR; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * WALEventTracker Table creation to be used by HMaster + */ +@InterfaceAudience.Private +public final class WALEventTrackerTableCreator { + private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerTableCreator.class); + private static final Long TTL = TimeUnit.DAYS.toSeconds(365); // 1 year in seconds + + private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER = + TableDescriptorBuilder.newBuilder(WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME) + .setRegionReplication(1).setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(HConstants.WAL_EVENT_TRACKER_INFO_FAMILY) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBlockCacheEnabled(false) + .setMaxVersions(1).setTimeToLive(TTL.intValue()).build()); + + /* Private default constructor */ + private WALEventTrackerTableCreator() { + } + + /* + * We will create this table only if hbase.regionserver.wal.event.tracker.enabled is enabled and + * table doesn't exists already. + */ + public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices) + throws IOException { + boolean walEventTrackerEnabled = conf.getBoolean(HConstants.WAL_EVENT_TRACKER_ENABLED_KEY, + HConstants.WAL_EVENT_TRACKER_ENABLED_DEFAULT); + if (!walEventTrackerEnabled) { + LOG.info("wal event tracker requests logging to table " + WAL_EVENT_TRACKER_TABLE_NAME_STR + + " is disabled. Quitting."); + return; + } + if ( + !masterServices.getTableDescriptors() + .exists(WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME) + ) { + LOG.info(WAL_EVENT_TRACKER_TABLE_NAME_STR + " table not found. Creating."); + masterServices.createTable(TABLE_DESCRIPTOR_BUILDER.build(), null, 0L, NO_NONCE); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java index ed4b470d577f..2d6f5bf57348 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.yetus.audience.InterfaceAudience; @@ -70,7 +71,8 @@ class LogEventHandler implements EventHandler { namedQueueServices.put(namedQueueService.getEvent(), namedQueueService); } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { - LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz); + LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz, + e); } } } @@ -105,8 +107,8 @@ boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { * Add all in memory queue records to system table. The implementors can use system table or * direct HDFS file or ZK as persistence system. */ - void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { - namedQueueServices.get(namedQueueEvent).persistAll(); + void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) { + namedQueueServices.get(namedQueueEvent).persistAll(connection); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java index ba2eb3322d6e..39cc093b2aa1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java @@ -29,7 +29,8 @@ public class NamedQueuePayload { public enum NamedQueueEvent { SLOW_LOG(0), BALANCE_DECISION(1), - BALANCE_REJECTION(2); + BALANCE_REJECTION(2), + WAL_EVENT_TRACKER(3); private final int value; @@ -48,6 +49,9 @@ public static NamedQueueEvent getEventByOrdinal(int value) { case 2: { return BALANCE_REJECTION; } + case 3: { + return WAL_EVENT_TRACKER; + } default: { throw new IllegalArgumentException( "NamedQueue event with ordinal " + value + " not defined"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java index 38f63fd09bec..6e88cf9cbc25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java @@ -22,6 +22,7 @@ import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.util.Threads; @@ -60,7 +61,7 @@ private NamedQueueRecorder(Configuration conf) { // disruptor initialization with BlockingWaitStrategy this.disruptor = new Disruptor<>(RingBufferEnvelope::new, getEventCount(eventCount), - new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d") + new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".named-queue-events-pool-%d") .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), ProducerType.MULTI, new BlockingWaitStrategy()); this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler()); @@ -137,9 +138,9 @@ public void addRecord(NamedQueuePayload namedQueuePayload) { * Add all in memory queue records to system table. The implementors can use system table or * direct HDFS file or ZK as persistence system. */ - public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { + public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) { if (this.logEventHandler != null) { - this.logEventHandler.persistAll(namedQueueEvent); + this.logEventHandler.persistAll(namedQueueEvent, connection); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java index 889323d9592d..6154a7c2de35 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.namequeues; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.yetus.audience.InterfaceAudience; @@ -57,6 +58,7 @@ public interface NamedQueueService { /** * Add all in memory queue records to system table. The implementors can use system table or * direct HDFS file or ZK as persistence system. + * @param connection connection */ - void persistAll(); + void persistAll(Connection connection); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueServiceChore.java similarity index 66% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueServiceChore.java index 0de6c8769895..b42baa328e47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueServiceChore.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Connection; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,11 +28,12 @@ * Chore to insert multiple accumulated slow/large logs to hbase:slowlog system table */ @InterfaceAudience.Private -public class SlowLogTableOpsChore extends ScheduledChore { +public class NamedQueueServiceChore extends ScheduledChore { - private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableOpsChore.class); + private static final Logger LOG = LoggerFactory.getLogger(NamedQueueServiceChore.class); private final NamedQueueRecorder namedQueueRecorder; + private final Connection connection; /** * Chore Constructor @@ -41,21 +43,23 @@ public class SlowLogTableOpsChore extends ScheduledChore { * scheduled * @param namedQueueRecorder {@link NamedQueueRecorder} instance */ - public SlowLogTableOpsChore(final Stoppable stopper, final int period, - final NamedQueueRecorder namedQueueRecorder) { - super("SlowLogTableOpsChore", stopper, period); + public NamedQueueServiceChore(final Stoppable stopper, final int period, + final NamedQueueRecorder namedQueueRecorder, Connection connection) { + super("NamedQueueServiceChore", stopper, period); this.namedQueueRecorder = namedQueueRecorder; + this.connection = connection; } @Override protected void chore() { - if (LOG.isTraceEnabled()) { - LOG.trace("SlowLog Table Ops Chore is starting up."); - } - namedQueueRecorder.persistAll(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); - if (LOG.isTraceEnabled()) { - LOG.trace("SlowLog Table Ops Chore is closing."); + for (NamedQueuePayload.NamedQueueEvent event : NamedQueuePayload.NamedQueueEvent.values()) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Starting chore for event %s", event.name())); + } + namedQueueRecorder.persistAll(event, connection); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Stopping chore for event %s", event.name())); + } } } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java index 95c1ed53f52c..b4104e6008f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java @@ -22,6 +22,7 @@ import java.util.Queue; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -66,7 +67,7 @@ public void addToQueueForSysTable(TooSlowLog.SlowLogPayload slowLogPayload) { /** * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch */ - public void addAllLogsToSysTable() { + public void addAllLogsToSysTable(Connection connection) { if (queueForSysTable == null) { LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting."); return; @@ -82,13 +83,13 @@ public void addAllLogsToSysTable() { slowLogPayloads.add(queueForSysTable.poll()); i++; if (i == SYSTABLE_PUT_BATCH_SIZE) { - SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); + SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, connection); slowLogPayloads.clear(); i = 0; } } if (slowLogPayloads.size() > 0) { - SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); + SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, connection); } } finally { LOCK.unlock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerPayload.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerPayload.java new file mode 100644 index 000000000000..9f549a72e51a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerPayload.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class WALEventTrackerPayload extends NamedQueuePayload { + + private final String rsName; + private final String walName; + private final long timeStamp; + private final String state; + private final long walLength; + + public WALEventTrackerPayload(String rsName, String walName, long timeStamp, String state, + long walLength) { + super(NamedQueueEvent.WAL_EVENT_TRACKER.getValue()); + this.rsName = rsName; + this.walName = walName; + this.timeStamp = timeStamp; + this.state = state; + this.walLength = walLength; + } + + public String getRsName() { + return rsName; + } + + public String getWalName() { + return walName; + } + + public long getTimeStamp() { + return timeStamp; + } + + public String getState() { + return state; + } + + public long getWalLength() { + return walLength; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(this.getClass().getSimpleName()); + sb.append("["); + sb.append("rsName=").append(rsName); + sb.append(", walName=").append(walName); + sb.append(", timeStamp=").append(timeStamp); + sb.append(", walState=").append(state); + sb.append(", walLength=").append(walLength); + sb.append("]"); + return sb.toString(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerQueueService.java new file mode 100644 index 000000000000..40fb6033cc31 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerQueueService.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import static org.apache.hadoop.hbase.HConstants.WAL_EVENT_TRACKER_ENABLED_DEFAULT; +import static org.apache.hadoop.hbase.HConstants.WAL_EVENT_TRACKER_ENABLED_KEY; + +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.Queue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; +import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; + +/* + This class provides the queue to save Wal events from backing RingBuffer. + */ +@InterfaceAudience.Private +public class WALEventTrackerQueueService implements NamedQueueService { + + private EvictingQueue queue; + private static final String WAL_EVENT_TRACKER_RING_BUFFER_SIZE = + "hbase.regionserver.wal.event.tracker.ringbuffer.size"; + private final boolean walEventTrackerEnabled; + private int queueSize; + private MetricsWALEventTrackerSource source = null; + + private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerQueueService.class); + + public WALEventTrackerQueueService(Configuration conf) { + this(conf, null); + } + + public WALEventTrackerQueueService(Configuration conf, MetricsWALEventTrackerSource source) { + this.walEventTrackerEnabled = + conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT); + if (!walEventTrackerEnabled) { + return; + } + + this.queueSize = conf.getInt(WAL_EVENT_TRACKER_RING_BUFFER_SIZE, 256); + queue = EvictingQueue.create(queueSize); + if (source == null) { + this.source = CompatibilitySingletonFactory.getInstance(MetricsWALEventTrackerSource.class); + } else { + this.source = source; + } + } + + @Override + public NamedQueuePayload.NamedQueueEvent getEvent() { + return NamedQueuePayload.NamedQueueEvent.WAL_EVENT_TRACKER; + } + + @Override + public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { + if (!walEventTrackerEnabled) { + return; + } + if (!(namedQueuePayload instanceof WALEventTrackerPayload)) { + LOG.warn("WALEventTrackerQueueService: NamedQueuePayload is not of type" + + " WALEventTrackerPayload."); + return; + } + + WALEventTrackerPayload payload = (WALEventTrackerPayload) namedQueuePayload; + if (LOG.isDebugEnabled()) { + LOG.debug("Adding wal event tracker payload " + payload); + } + addToQueue(payload); + } + + /* + * Made it default to use it in testing. + */ + synchronized void addToQueue(WALEventTrackerPayload payload) { + queue.add(payload); + } + + @Override + public boolean clearNamedQueue() { + if (!walEventTrackerEnabled) { + return false; + } + LOG.debug("Clearing wal event tracker queue"); + queue.clear(); + return true; + } + + @Override + public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { + return null; + } + + @Override + public void persistAll(Connection connection) { + if (!walEventTrackerEnabled) { + return; + } + if (queue.isEmpty()) { + LOG.debug("Wal Event tracker queue is empty."); + return; + } + + Queue queue = getWALEventTrackerList(); + try { + WALEventTrackerTableAccessor.addWalEventTrackerRows(queue, connection); + } catch (Exception ioe) { + // If we fail to persist the records with retries then just forget about them. + // This is a best effort service. + LOG.error("Failed while persisting wal tracker records", ioe); + // Increment metrics for failed puts + source.incrFailedPuts(queue.size()); + } + } + + private synchronized Queue getWALEventTrackerList() { + Queue retQueue = new ArrayDeque<>(); + Iterator iterator = queue.iterator(); + while (iterator.hasNext()) { + retQueue.add(iterator.next()); + } + queue.clear(); + return retQueue; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerTableAccessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerTableAccessor.java new file mode 100644 index 000000000000..51dc064a6202 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerTableAccessor.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public final class WALEventTrackerTableAccessor { + private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerTableAccessor.class); + + public static final String RS_COLUMN = "region_server_name"; + public static final String WAL_NAME_COLUMN = "wal_name"; + public static final String TIMESTAMP_COLUMN = "timestamp"; + public static final String WAL_STATE_COLUMN = "wal_state"; + public static final String WAL_LENGTH_COLUMN = "wal_length"; + public static final String MAX_ATTEMPTS_KEY = "wal.event.tracker.max.attempts"; + public static final String SLEEP_INTERVAL_KEY = "wal.event.tracker.sleep.interval.msec"; + public static final String MAX_SLEEP_TIME_KEY = "wal.event.tracker.max.sleep.time.msec"; + public static final int DEFAULT_MAX_ATTEMPTS = 3; + public static final long DEFAULT_SLEEP_INTERVAL = 1000L; // 1 second + public static final long DEFAULT_MAX_SLEEP_TIME = 60000L; // 60 seconds + public static final String WAL_EVENT_TRACKER_TABLE_NAME_STR = "REPLICATION.WALEVENTTRACKER"; + public static final String DELIMITER = "_"; + + private WALEventTrackerTableAccessor() { + } + + /** + * {@link #WAL_EVENT_TRACKER_TABLE_NAME_STR} table name - can be enabled with config - + * hbase.regionserver.wal.event.tracker.enabled + */ + public static final TableName WAL_EVENT_TRACKER_TABLE_NAME = + TableName.valueOf(WAL_EVENT_TRACKER_TABLE_NAME_STR); + + private static void doPut(final Connection connection, final List puts) throws Exception { + RetryCounter retryCounter = getRetryFactory(connection.getConfiguration()).create(); + while (true) { + try (Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME)) { + table.put(puts); + return; + } catch (IOException ioe) { + retryOrThrow(retryCounter, ioe); + } + retryCounter.sleepUntilNextRetry(); + } + } + + private static RetryCounterFactory getRetryFactory(Configuration conf) { + int maxAttempts = conf.getInt(MAX_ATTEMPTS_KEY, DEFAULT_MAX_ATTEMPTS); + long sleepIntervalMs = conf.getLong(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL); + long maxSleepTimeMs = conf.getLong(MAX_SLEEP_TIME_KEY, DEFAULT_MAX_SLEEP_TIME); + RetryCounter.RetryConfig retryConfig = + new RetryCounter.RetryConfig(maxAttempts, sleepIntervalMs, maxSleepTimeMs, + TimeUnit.MILLISECONDS, new RetryCounter.ExponentialBackoffPolicyWithLimit()); + return new RetryCounterFactory(retryConfig); + } + + private static void retryOrThrow(RetryCounter retryCounter, IOException ioe) throws IOException { + if (retryCounter.shouldRetry()) { + return; + } + throw ioe; + } + + /** + * Add wal event tracker rows to hbase:waleventtracker table + * @param walEventPayloads List of walevents to process + * @param connection Connection to use. + */ + public static void addWalEventTrackerRows(Queue walEventPayloads, + final Connection connection) throws Exception { + List puts = new ArrayList<>(walEventPayloads.size()); + for (WALEventTrackerPayload payload : walEventPayloads) { + final byte[] rowKey = getRowKey(payload); + final Put put = new Put(rowKey); + // TODO Do we need to SKIP_WAL ? + put.setPriority(HConstants.NORMAL_QOS); + put + .addColumn(HConstants.WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(RS_COLUMN), + Bytes.toBytes(payload.getRsName())) + .addColumn(HConstants.WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_NAME_COLUMN), + Bytes.toBytes(payload.getWalName())) + .addColumn(HConstants.WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(TIMESTAMP_COLUMN), + Bytes.toBytes(payload.getTimeStamp())) + .addColumn(HConstants.WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_STATE_COLUMN), + Bytes.toBytes(payload.getState())) + .addColumn(HConstants.WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_LENGTH_COLUMN), + Bytes.toBytes(payload.getWalLength())); + puts.add(put); + } + doPut(connection, puts); + } + + /** + * Create rowKey: 1. We want RS name to be the leading part of rowkey so that we can query by RS + * name filter. WAL name contains rs name as a leading part. 2. Timestamp when the event was + * generated. 3. Add state of the wal. Combination of 1 + 2 + 3 is definitely going to create a + * unique rowkey. + * @param payload payload to process + * @return rowKey byte[] + */ + public static byte[] getRowKey(final WALEventTrackerPayload payload) { + String walName = payload.getWalName(); + // converting to string since this will help seeing the timestamp in string format using + // hbase shell commands. + String timestampStr = String.valueOf(payload.getTimeStamp()); + String walState = payload.getState(); + final String rowKeyStr = walName + DELIMITER + timestampStr + DELIMITER + walState; + return Bytes.toBytes(rowKeyStr); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java index 45bfca112700..885e2d44279c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.BalancerDecision; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; @@ -141,7 +142,7 @@ public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) } @Override - public void persistAll() { + public void persistAll(Connection connection) { // no-op for now } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java index 79b7325b305d..fb94db2b917d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.BalancerRejection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails; import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; @@ -127,7 +128,7 @@ public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) } @Override - public void persistAll() { + public void persistAll(Connection connection) { // no-op for now } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java index 03b6aa719ea9..86b24e9d975e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.SlowLogParams; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.hadoop.hbase.namequeues.LogHandlerUtils; @@ -223,12 +224,12 @@ private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) * table. */ @Override - public void persistAll() { + public void persistAll(Connection connection) { if (!isOnlineLogProviderEnabled) { return; } if (slowLogPersistentService != null) { - slowLogPersistentService.addAllLogsToSysTable(); + slowLogPersistentService.addAllLogsToSysTable(connection); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7dc474dcdb2a..7e87cde1eff5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -19,8 +19,12 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION; import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.HConstants.NAMED_QUEUE_CHORE_DURATION_DEFAULT; +import static org.apache.hadoop.hbase.HConstants.WAL_EVENT_TRACKER_ENABLED_DEFAULT; +import static org.apache.hadoop.hbase.HConstants.WAL_EVENT_TRACKER_ENABLED_KEY; import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; import java.io.IOException; @@ -107,7 +111,7 @@ import org.apache.hadoop.hbase.mob.MobFileCache; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; -import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; +import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; @@ -130,6 +134,8 @@ import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; @@ -362,7 +368,7 @@ public class HRegionServer extends HBaseServerBase private final RegionServerAccounting regionServerAccounting; - private SlowLogTableOpsChore slowLogTableOpsChore = null; + private NamedQueueServiceChore namedQueueServiceChore = null; // Block cache private BlockCache blockCache; @@ -1688,9 +1694,23 @@ private void setupWALAndReplication() throws IOException { } // Instantiate replication if replication enabled. Pass it the log directories. createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); + + WALActionsListener walEventListener = getWALEventTrackerListener(conf); + if (walEventListener != null && factory.getWALProvider() != null) { + factory.getWALProvider().addWALActionsListener(walEventListener); + } this.walFactory = factory; } + private WALActionsListener getWALEventTrackerListener(Configuration conf) { + if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) { + WALEventTrackerListener listener = + new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName()); + return listener; + } + return null; + } + /** * Start up replication source and sink handlers. */ @@ -1860,8 +1880,8 @@ executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OP if (this.fsUtilizationChore != null) { choreService.scheduleChore(fsUtilizationChore); } - if (this.slowLogTableOpsChore != null) { - choreService.scheduleChore(slowLogTableOpsChore); + if (this.namedQueueServiceChore != null) { + choreService.scheduleChore(namedQueueServiceChore); } if (this.brokenStoreFileCleaner != null) { choreService.scheduleChore(brokenStoreFileCleaner); @@ -1913,10 +1933,22 @@ private void initializeThreads() { final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); - if (isSlowLogTableEnabled) { + final boolean walEventTrackerEnabled = + conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT); + + if (isSlowLogTableEnabled || walEventTrackerEnabled) { // default chore duration: 10 min - final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000); - slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder); + // After , we will remove hbase.slowlog.systable.chore.duration conf property + final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY, + DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION); + + final int namedQueueChoreDuration = + conf.getInt(HConstants.NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT); + // Considering min of slowLogChoreDuration and namedQueueChoreDuration + int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration); + + namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration, + this.namedQueueRecorder, this.getConnection()); } if (this.nonceManager != null) { @@ -3498,13 +3530,7 @@ public List getMetaLocations() { @Override protected NamedQueueRecorder createNamedQueueRecord() { - final boolean isOnlineLogProviderEnabled = conf.getBoolean( - HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); - if (isOnlineLogProviderEnabled) { - return NamedQueueRecorder.getInstance(conf); - } else { - return null; - } + return NamedQueueRecorder.getInstance(conf); } @Override @@ -3533,7 +3559,7 @@ protected void stopChores() { shutdownChore(executorStatusChore); shutdownChore(storefileRefresher); shutdownChore(fsUtilizationChore); - shutdownChore(slowLogTableOpsChore); + shutdownChore(namedQueueServiceChore); shutdownChore(brokenStoreFileCleaner); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 2c0a656049c1..49fdb9748da1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -639,7 +639,7 @@ private Path getNewPath() throws IOException { return newPath; } - Path getOldPath() { + public Path getOldPath() { long currentFilenum = this.filenum.get(); Path oldPath = null; if (currentFilenum > 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEventTrackerListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEventTrackerListener.java new file mode 100644 index 000000000000..487c7de41707 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEventTrackerListener.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; +import org.apache.hadoop.hbase.namequeues.WALEventTrackerPayload; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class WALEventTrackerListener implements WALActionsListener { + private final Configuration conf; + private final NamedQueueRecorder namedQueueRecorder; + private final String serverName; + + public enum WalState { + ROLLING, + ROLLED, + ACTIVE + } + + public WALEventTrackerListener(Configuration conf, NamedQueueRecorder namedQueueRecorder, + ServerName serverName) { + this.conf = conf; + this.namedQueueRecorder = namedQueueRecorder; + this.serverName = serverName.getHostname(); + } + + @Override + public void preLogRoll(Path oldPath, Path newPath) { + if (oldPath != null) { + // oldPath can be null for first wal + // Just persist the last component of path not the whole walName which includes filesystem + // scheme, walDir. + WALEventTrackerPayload payloadForOldPath = + getPayload(oldPath.getName(), WalState.ROLLING.name(), 0L); + this.namedQueueRecorder.addRecord(payloadForOldPath); + } + } + + @Override + public void postLogRoll(Path oldPath, Path newPath) { + // Create 2 entries entry in RingBuffer. + // 1. Change state to Rolled for oldPath + // 2. Change state to Active for newPath. + if (oldPath != null) { + // oldPath can be null for first wal + // Just persist the last component of path not the whole walName which includes filesystem + // scheme, walDir. + + long fileLength = 0L; + try { + FileSystem fs = oldPath.getFileSystem(this.conf); + fileLength = fs.getFileStatus(oldPath).getLen(); + } catch (IOException ioe) { + // Saving wal length is best effort. In case of any exception just ignore. + } + WALEventTrackerPayload payloadForOldPath = + getPayload(oldPath.getName(), WalState.ROLLED.name(), fileLength); + this.namedQueueRecorder.addRecord(payloadForOldPath); + } + + WALEventTrackerPayload payloadForNewPath = + getPayload(newPath.getName(), WalState.ACTIVE.name(), 0L); + this.namedQueueRecorder.addRecord(payloadForNewPath); + } + + private WALEventTrackerPayload getPayload(String path, String state, long walLength) { + long timestamp = EnvironmentEdgeManager.currentTime(); + WALEventTrackerPayload payload = + new WALEventTrackerPayload(serverName, path, timestamp, state, walLength); + return payload; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTracker.java new file mode 100644 index 000000000000..1a87effa8d56 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTracker.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.RS_COLUMN; +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.TIMESTAMP_COLUMN; +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME; +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_LENGTH_COLUMN; +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_NAME_COLUMN; +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_STATE_COLUMN; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestWALEventTracker { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALEventTracker.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); + private static HBaseTestingUtil TEST_UTIL; + public static Configuration CONF; + + @BeforeClass + public static void setup() throws Exception { + CONF = HBaseConfiguration.create(); + CONF.setBoolean(HConstants.WAL_EVENT_TRACKER_ENABLED_KEY, true); + // Set the chore for less than a second. + CONF.setInt(HConstants.NAMED_QUEUE_CHORE_DURATION_KEY, 900); + CONF.setLong(WALEventTrackerTableAccessor.SLEEP_INTERVAL_KEY, 100); + TEST_UTIL = new HBaseTestingUtil(CONF); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void teardown() throws Exception { + LOG.info("Calling teardown"); + TEST_UTIL.shutdownMiniHBaseCluster(); + } + + @Before + public void waitForWalEventTrackerTableCreation() { + Waiter.waitFor(CONF, 10000, + (Waiter.Predicate) () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME)); + } + + @Test + public void testWALRolling() throws Exception { + Connection connection = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getConnection(); + waitForWALEventTrackerTable(connection); + List wals = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWALs(); + assertEquals(1, wals.size()); + AbstractFSWAL wal = (AbstractFSWAL) wals.get(0); + Path wal1Path = wal.getOldPath(); + wal.rollWriter(true); + + FileSystem fs = TEST_UTIL.getTestFileSystem(); + long wal1Length = fs.getFileStatus(wal1Path).getLen(); + Path wal2Path = wal.getOldPath(); + String hostName = + TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName().getHostname(); + + TEST_UTIL.waitFor(5000, () -> getTableCount(connection) >= 3); + List walEventsList = getRows(hostName, connection); + + // There should be atleast 2 events for wal1Name, with ROLLING and ROLLED state. Most of the + // time we will lose ACTIVE event for the first wal creates since hmaster will take some time + // to create hbase:waleventtracker table and by that time RS will already create the first wal + // and will try to persist it. + compareEvents(hostName, wal1Path.getName(), walEventsList, + new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ROLLING.name(), + WALEventTrackerListener.WalState.ROLLED.name())), + false); + + // There should be only 1 event for wal2Name which is current wal, with ACTIVE state + compareEvents(hostName, wal2Path.getName(), walEventsList, + new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ACTIVE.name())), true); + + // Check that event with wal1Path and state ROLLED has the wal length set. + checkWALRolledEventHasSize(walEventsList, wal1Path.getName(), wal1Length); + } + + private void checkWALRolledEventHasSize(List walEvents, String walName, + long actualSize) { + List eventsFilteredByNameState = new ArrayList<>(); + // Filter the list by walName and wal state. + for (WALEventTrackerPayload event : walEvents) { + if ( + walName.equals(event.getWalName()) + && WALEventTrackerListener.WalState.ROLLED.name().equals(event.getState()) + ) { + eventsFilteredByNameState.add(event); + } + } + + assertEquals(1, eventsFilteredByNameState.size()); + // We are not comparing the size of the WAL in the tracker table with actual size. + // For AsyncWAL implementation, since the WAL file is closed in an async fashion, the WAL length + // will always be incorrect. + // For FSHLog implementation, we close the WAL in an executor thread. So there will always be + // a difference of trailer size bytes. + // assertEquals(actualSize, eventsFilteredByNameState.get(0).getWalLength()); + } + + /** + * Compare the events from @{@link WALEventTrackerTableAccessor#WAL_EVENT_TRACKER_TABLE_NAME} + * @param hostName hostname + * @param walName walname + * @param walEvents event from table + * @param expectedStates expected states for the hostname and wal name + * @param strict whether to check strictly or not. Sometimes we lose the ACTIVE state + * event for the first wal since it takes some time for hmaster to create + * the table and by that time RS already creates the first WAL and will try + * to persist ACTIVE event to waleventtracker table. + */ + private void compareEvents(String hostName, String walName, + List walEvents, List expectedStates, boolean strict) { + List eventsFilteredByWalName = new ArrayList<>(); + + // Assert that all the events have the same host name i.e they came from the same RS. + for (WALEventTrackerPayload event : walEvents) { + assertEquals(hostName, event.getRsName()); + } + + // Filter the list by walName. + for (WALEventTrackerPayload event : walEvents) { + if (walName.equals(event.getWalName())) { + eventsFilteredByWalName.add(event); + } + } + + // Assert that the list of events after filtering by walName should be same as expected states. + if (strict) { + assertEquals(expectedStates.size(), eventsFilteredByWalName.size()); + } + + for (WALEventTrackerPayload event : eventsFilteredByWalName) { + expectedStates.remove(event.getState()); + } + assertEquals(0, expectedStates.size()); + } + + private void waitForWALEventTrackerTable(Connection connection) throws IOException { + TEST_UTIL.waitFor(5000, () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME)); + } + + private List getRows(String rowKeyPrefix, Connection connection) + throws IOException { + List list = new ArrayList<>(); + Scan scan = new Scan(); + scan.withStartRow(Bytes.toBytes(rowKeyPrefix)); + Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME); + ResultScanner scanner = table.getScanner(scan); + + Result r; + while ((r = scanner.next()) != null) { + List cells = r.listCells(); + list.add(getPayload(cells)); + } + return list; + } + + private WALEventTrackerPayload getPayload(List cells) { + String rsName = null, walName = null, walState = null; + long timestamp = 0L, walLength = 0L; + for (Cell cell : cells) { + byte[] qualifier = CellUtil.cloneQualifier(cell); + byte[] value = CellUtil.cloneValue(cell); + String qualifierStr = Bytes.toString(qualifier); + + if (RS_COLUMN.equals(qualifierStr)) { + rsName = Bytes.toString(value); + } else if (WAL_NAME_COLUMN.equals(qualifierStr)) { + walName = Bytes.toString(value); + } else if (WAL_STATE_COLUMN.equals(qualifierStr)) { + walState = Bytes.toString(value); + } else if (TIMESTAMP_COLUMN.equals(qualifierStr)) { + timestamp = Bytes.toLong(value); + } else if (WAL_LENGTH_COLUMN.equals(qualifierStr)) { + walLength = Bytes.toLong(value); + } + } + return new WALEventTrackerPayload(rsName, walName, timestamp, walState, walLength); + } + + private int getTableCount(Connection connection) throws Exception { + Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME); + ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM)); + int count = 0; + while (resultScanner.next() != null) { + count++; + } + LOG.info("Table count: " + count); + return count; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTrackerTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTrackerTableAccessor.java new file mode 100644 index 000000000000..397cda5a9e35 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTrackerTableAccessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestWALEventTrackerTableAccessor { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALEventTrackerTableAccessor.class); + + /* + * Tests that rowkey is getting constructed correctly. + */ + @Test + public void testRowKey() { + String rsName = "test-region-server"; + String walName = "test-wal-0"; + long timeStamp = EnvironmentEdgeManager.currentTime(); + String walState = WALEventTrackerListener.WalState.ACTIVE.name(); + long walLength = 100L; + WALEventTrackerPayload payload = + new WALEventTrackerPayload(rsName, walName, timeStamp, walState, walLength); + byte[] rowKeyBytes = WALEventTrackerTableAccessor.getRowKey(payload); + + String rowKeyBytesStr = Bytes.toString(rowKeyBytes); + String[] fields = rowKeyBytesStr.split(WALEventTrackerTableAccessor.DELIMITER); + // This is the format of rowkey: walName_timestamp_walState; + assertEquals(walName, fields[0]); + assertEquals(timeStamp, Long.valueOf(fields[1]).longValue()); + assertEquals(walState, fields[2]); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWalEventTrackerQueueService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWalEventTrackerQueueService.java new file mode 100644 index 000000000000..55cb0145de7e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWalEventTrackerQueueService.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category(SmallTests.class) +public class TestWalEventTrackerQueueService { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWalEventTrackerQueueService.class); + + @Rule + public TestName name = new TestName(); + + /* + * Test whether wal event tracker metrics are being incremented. + */ + @Test + public void testMetrics() throws Exception { + String rsName = "test-region-server"; + String walName = "test-wal-0"; + long timeStamp = EnvironmentEdgeManager.currentTime(); + String walState = WALEventTrackerListener.WalState.ACTIVE.name(); + long walLength = 100L; + WALEventTrackerPayload payload = + new WALEventTrackerPayload(rsName, walName, timeStamp, walState, walLength); + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(HConstants.WAL_EVENT_TRACKER_ENABLED_KEY, true); + conf.setLong(WALEventTrackerTableAccessor.SLEEP_INTERVAL_KEY, 100); + MetricsWALEventTrackerSourceImpl source = new MetricsWALEventTrackerSourceImpl( + name.getMethodName(), name.getMethodName(), name.getMethodName(), name.getMethodName()); + WALEventTrackerQueueService service = new WALEventTrackerQueueService(conf, source); + service.addToQueue(payload); + Connection mockConnection = mock(Connection.class); + doReturn(conf).when(mockConnection).getConfiguration(); + // Always throw IOException whenever mock connection is being used. + doThrow(new IOException()).when(mockConnection).getTable(WAL_EVENT_TRACKER_TABLE_NAME); + assertEquals(0L, source.getFailedPuts()); + assertEquals(0L, source.getNumRecordsFailedPuts()); + // Persist all the events. + service.persistAll(mockConnection); + assertEquals(1L, source.getFailedPuts()); + assertEquals(1L, source.getNumRecordsFailedPuts()); + // Verify that we tried MAX_RETRY_ATTEMPTS retry attempts to persist. + verify(mockConnection, times(1 + WALEventTrackerTableAccessor.DEFAULT_MAX_ATTEMPTS)) + .getTable(WAL_EVENT_TRACKER_TABLE_NAME); + } +}