diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java index 0a701cd3ad63..df7d91767aa9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Calendar; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +70,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab private long splitPlanCount; private long mergePlanCount; private final AtomicLong cumulativePlansSizeLimitMb; + private final OffPeakHours offPeakHours; RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices, final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue workQueue) { @@ -81,6 +84,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration); this.cumulativePlansSizeLimitMb = new AtomicLong( configuration.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB)); + this.offPeakHours = OffPeakHours.getInstance(configuration); } private boolean extractDefaultNormalizerValue(final Configuration configuration) { @@ -186,6 +190,10 @@ public void run() { LOG.debug("interrupt detected. terminating."); break; } + if (!offPeakHours.equals(OffPeakHours.DISABLED) && !offPeakHours.isOffPeakHour()) { + sleepToNextHour(); + continue; + } final TableName tableName; try { tableName = workQueue.take(); @@ -199,6 +207,22 @@ public void run() { } } + private void sleepToNextHour() { + LOG.info("offpeak hours is configured and we'll wait for offpeak hours to continue normalising."); + Calendar now = Calendar.getInstance(); + Calendar nextHour = (Calendar) now.clone(); + nextHour.add(Calendar.HOUR_OF_DAY, 1); + nextHour.set(Calendar.MINUTE, 0); + nextHour.set(Calendar.SECOND, 0); + nextHour.set(Calendar.MILLISECOND, 0); + try { + Thread.sleep(nextHour.getTimeInMillis() - now.getTimeInMillis()); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting to next hour."); + e.printStackTrace(); + } + } + private List calculatePlans(final TableName tableName) { if (masterServices.skipRegionManagementAction("region normalizer")) { return Collections.emptyList(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java index c5f0a201cb0c..4e90f6e3baf5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java @@ -20,6 +20,7 @@ import static java.util.Collections.singletonList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.comparesEqualTo; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.nullValue; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.hamcrest.Description; @@ -234,6 +236,40 @@ public void testPlansSizeLimit() throws Exception { comparesEqualTo(1L)); } + @Test + public void testNormalizerWorkInOffpeak() throws Exception { + final TableName tn = tableName.getTableName(); + final TableDescriptor tnDescriptor = + TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build(); + when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor); + when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L); + when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(singletonList( + new MergeNormalizationPlan.Builder().addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10) + .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20).build())); + + Configuration configuration = testingUtility.getConfiguration(); + configuration.set("hbase.offpeak.start.hour", "16"); + configuration.set("hbase.offpeak.end.hour", "17"); + + RegionNormalizerWorker worker = + new RegionNormalizerWorker(configuration, masterServices, regionNormalizer, queue); + long beforeMergePlanCount = worker.getMergePlanCount(); + workerPool.submit(worker); + queue.put(tn); + + Thread.sleep(5000); + + OffPeakHours offPeakHours = OffPeakHours.getInstance(configuration); + if (offPeakHours.isOffPeakHour()) { + assertThatEventually("executing work should see plan count increase", + worker::getMergePlanCount, greaterThan(beforeMergePlanCount)); + } else { + assertThatEventually("executing work should see plan count unchanged", + worker::getMergePlanCount, equalTo(beforeMergePlanCount)); + } + workerPool.shutdownNow(); + } + /** * Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until * the matcher succeeds or the timeout period of 30 seconds is exhausted.