Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -33,6 +34,7 @@
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -68,6 +70,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
private long splitPlanCount;
private long mergePlanCount;
private final AtomicLong cumulativePlansSizeLimitMb;
private final OffPeakHours offPeakHours;

RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices,
final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue<TableName> workQueue) {
Expand All @@ -81,6 +84,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration);
this.cumulativePlansSizeLimitMb = new AtomicLong(
configuration.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB));
this.offPeakHours = OffPeakHours.getInstance(configuration);
}

private boolean extractDefaultNormalizerValue(final Configuration configuration) {
Expand Down Expand Up @@ -186,6 +190,10 @@ public void run() {
LOG.debug("interrupt detected. terminating.");
break;
}
if (!offPeakHours.equals(OffPeakHours.DISABLED) && !offPeakHours.isOffPeakHour()) {
sleepToNextHour();
continue;
}
final TableName tableName;
try {
tableName = workQueue.take();
Expand All @@ -199,6 +207,22 @@ public void run() {
}
}

private void sleepToNextHour() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add an info log saying offpeak hours is configured and we'll wait for offpeak hours to continue normalising.

LOG.info("offpeak hours is configured and we'll wait for offpeak hours to continue normalising.");
Calendar now = Calendar.getInstance();
Calendar nextHour = (Calendar) now.clone();
nextHour.add(Calendar.HOUR_OF_DAY, 1);
nextHour.set(Calendar.MINUTE, 0);
nextHour.set(Calendar.SECOND, 0);
nextHour.set(Calendar.MILLISECOND, 0);
try {
Thread.sleep(nextHour.getTimeInMillis() - now.getTimeInMillis());
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting to next hour.");
e.printStackTrace();
}
}

private List<NormalizationPlan> calculatePlans(final TableName tableName) {
if (masterServices.skipRegionManagementAction("region normalizer")) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.util.Collections.singletonList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -48,6 +49,7 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.hamcrest.Description;
Expand Down Expand Up @@ -234,6 +236,40 @@ public void testPlansSizeLimit() throws Exception {
comparesEqualTo(1L));
}

@Test
public void testNormalizerWorkInOffpeak() throws Exception {
final TableName tn = tableName.getTableName();
final TableDescriptor tnDescriptor =
TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(singletonList(
new MergeNormalizationPlan.Builder().addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10)
.addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20).build()));

Configuration configuration = testingUtility.getConfiguration();
configuration.set("hbase.offpeak.start.hour", "16");
configuration.set("hbase.offpeak.end.hour", "17");

RegionNormalizerWorker worker =
new RegionNormalizerWorker(configuration, masterServices, regionNormalizer, queue);
long beforeMergePlanCount = worker.getMergePlanCount();
workerPool.submit(worker);
queue.put(tn);

Thread.sleep(5000);

OffPeakHours offPeakHours = OffPeakHours.getInstance(configuration);
if (offPeakHours.isOffPeakHour()) {
assertThatEventually("executing work should see plan count increase",
worker::getMergePlanCount, greaterThan(beforeMergePlanCount));
} else {
assertThatEventually("executing work should see plan count unchanged",
worker::getMergePlanCount, equalTo(beforeMergePlanCount));
}
workerPool.shutdownNow();
}

/**
* Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until
* the matcher succeeds or the timeout period of 30 seconds is exhausted.
Expand Down