Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread,

boolean prefetchComplete();

boolean prefetchStarted();

/**
* To close the stream's socket. Note: This can be concurrently called from multiple threads and
* implementation should take care of thread safety.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,15 @@ public boolean prefetchComplete() {
return PrefetchExecutor.isCompleted(path);
}

/**
* Returns true if block prefetching was started after waiting for specified delay, false
* otherwise
*/
@Override
public boolean prefetchStarted() {
return PrefetchExecutor.isPrefetchStarted();
}

/**
* Create a Scanner on this file. No seeks or reads are done on creation. Call
* {@link HFileScanner#seekTo(Cell)} to position an start the read. There is nothing to clean up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import com.google.errorprone.annotations.RestrictedApi;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand All @@ -41,13 +44,18 @@
public final class PrefetchExecutor {

private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);
/** Wait time in miliseconds before executing prefetch */
public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay";
public static final String PREFETCH_DELAY_VARIATION = "hbase.hfile.prefetch.delay.variation";

/** Futures for tracking block prefetch activity */
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
public static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be private.

/** Runnables for resetting the prefetch activity */
public static final Map<Path, Runnable> prefetchRunnable = new ConcurrentSkipListMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be private.

/** Executor pool shared among all HFiles for block prefetch */
private static final ScheduledExecutorService prefetchExecutorPool;
public static final ScheduledExecutorService prefetchExecutorPool;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be private.

/** Delay before beginning prefetch */
private static final int prefetchDelayMillis;
private static int prefetchDelayMillis;
/** Variation in prefetch delay times, to mitigate stampedes */
private static final float prefetchDelayVariation;
static {
Expand All @@ -56,8 +64,8 @@ public final class PrefetchExecutor {
Configuration conf = HBaseConfiguration.create();
// 1s here for tests, consider 30s in hbase-default.xml
// Set to 0 for no delay
prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000);
prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f);
prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
prefetchDelayVariation = conf.getFloat(PREFETCH_DELAY_VARIATION, 0.2f);
int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() {
@Override
Expand Down Expand Up @@ -95,15 +103,18 @@ public static void request(Path path, Runnable runnable) {
final Future<?> future =
prefetchExecutorPool.schedule(tracedRunnable, delay, TimeUnit.MILLISECONDS);
prefetchFutures.put(path, future);
prefetchRunnable.put(path, runnable);
} catch (RejectedExecutionException e) {
prefetchFutures.remove(path);
prefetchRunnable.remove(path);
LOG.warn("Prefetch request rejected for {}", path);
}
}
}

public static void complete(Path path) {
prefetchFutures.remove(path);
prefetchRunnable.remove(path);
if (LOG.isDebugEnabled()) {
LOG.debug("Prefetch completed for {}", path.getName());
}
Expand All @@ -115,23 +126,82 @@ public static void cancel(Path path) {
// ok to race with other cancellation attempts
future.cancel(true);
prefetchFutures.remove(path);
prefetchRunnable.remove(path);
LOG.debug("Prefetch cancelled for {}", path);
}
}

public static boolean isCompleted(Path path) {
public static void interrupt(Path path) {
Future<?> future = prefetchFutures.get(path);
if (future != null) {
return future.isDone();
prefetchFutures.remove(path);
// ok to race with other cancellation attempts
future.cancel(true);
LOG.debug("Prefetch cancelled for {}", path);
}
return true;
}

private PrefetchExecutor() {
}

public static boolean isCompleted(Path path) {
Future<?> future = prefetchFutures.get(path);
if (future != null) {
return future.isDone();
}
return true;
}

/* Visible for testing only */
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
static ScheduledExecutorService getExecutorPool() {
return prefetchExecutorPool;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
static Map<Path, Future<?>> getPrefetchFutures() {
return prefetchFutures;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
static Map<Path, Runnable> getPrefetchRunnable() {
return prefetchRunnable;
}

static boolean isPrefetchStarted() {
AtomicBoolean prefetchStarted = new AtomicBoolean(false);
for (Map.Entry<Path, Future<?>> entry : prefetchFutures.entrySet()) {
Path k = entry.getKey();
Future<?> v = entry.getValue();
ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
long waitTime = sf.getDelay(TimeUnit.MILLISECONDS);
if (waitTime < 0) {
// At this point prefetch is started
prefetchStarted.set(true);
break;
}
}
return prefetchStarted.get();
}

public static int getPrefetchDelay() {
return prefetchDelayMillis;
}

public static void loadConfiguration(Configuration conf) {
prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
prefetchFutures.forEach((k, v) -> {
ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
if (!(sf.getDelay(TimeUnit.MILLISECONDS) > 0)) {
// the thread is still pending delay expiration and has not started to run yet, so can be
// re-scheduled at no cost.
interrupt(k);
request(k, prefetchRunnable.get(k));
}
LOG.debug("Reset called on Prefetch of file {} with delay {}", k, prefetchDelayMillis);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,9 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
*/
private ReplicationMarkerChore replicationMarkerChore;

// A timer submit requests to the PrefetchExecutor
private PrefetchExecutorNotifier prefetchExecutorNotifier;

/**
* Starts a HRegionServer at the default location.
* <p/>
Expand Down Expand Up @@ -2039,6 +2042,9 @@ private void initializeThreads() {
// Compaction thread
this.compactSplitThread = new CompactSplit(this);

// Prefetch Notifier
this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);

// Background thread to check for compactions; needed if region has not gotten updates
// in a while. It will take care of not checking too frequently on store-by-store basis.
this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
Expand Down Expand Up @@ -2128,6 +2134,7 @@ private void registerConfigurationObservers() {
configurationManager.registerObserver(this.compactSplitThread);
configurationManager.registerObserver(this.cacheFlusher);
configurationManager.registerObserver(this.rpcServices);
configurationManager.registerObserver(this.prefetchExecutorNotifier);
configurationManager.registerObserver(this);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class to submit requests for PrefetchExecutor depending on configuration change
*/
@InterfaceAudience.Private
public final class PrefetchExecutorNotifier implements PropagatingConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class);

/** Wait time in miliseconds before executing prefetch */
public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay";
private final Configuration conf;

// only for test
public PrefetchExecutorNotifier(Configuration conf) {
this.conf = conf;
}

/**
* {@inheritDoc}
*/
@Override
public void onConfigurationChange(Configuration newConf) {
// Update prefetch delay in the prefetch executor class
// interrupt and restart threads which have not started executing
PrefetchExecutor.loadConfiguration(conf);
LOG.info("Config hbase.hfile.prefetch.delay is changed to {}",
conf.getInt(PREFETCH_DELAY, 1000));
}

/**
* {@inheritDoc}
*/
@Override
public void registerChildren(ConfigurationManager manager) {
// No children to register.
}

/**
* {@inheritDoc}
*/
@Override
public void deregisterChildren(ConfigurationManager manager) {
// No children to register
}

public int getPrefetchDelay() {
return PrefetchExecutor.getPrefetchDelay();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY;
import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
Expand Down Expand Up @@ -65,6 +66,7 @@
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.PrefetchExecutorNotifier;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
Expand Down Expand Up @@ -95,7 +97,6 @@ public class TestPrefetch {
private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
private static final int DATA_BLOCK_SIZE = 2048;
private static final int NUM_KV = 1000;

private Configuration conf;
private CacheConfig cacheConf;
private FileSystem fs;
Expand Down Expand Up @@ -336,6 +337,62 @@ public void testPrefetchDoesntSkipRefs() throws Exception {
});
}

@Test
public void testOnConfigurationChange() {
PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
conf.setInt(PREFETCH_DELAY, 40000);
prefetchExecutorNotifier.onConfigurationChange(conf);
assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 40000);

// restore
conf.setInt(PREFETCH_DELAY, 30000);
prefetchExecutorNotifier.onConfigurationChange(conf);
assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 30000);

conf.setInt(PREFETCH_DELAY, 1000);
prefetchExecutorNotifier.onConfigurationChange(conf);
}

@Test
public void testPrefetchWithDelay() throws Exception {
// Configure custom delay
PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
conf.setInt(PREFETCH_DELAY, 25000);
prefetchExecutorNotifier.onConfigurationChange(conf);

HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
.withBlockSize(DATA_BLOCK_SIZE).build();
Path storeFile = writeStoreFile("TestPrefetchWithDelay", context);

HFile.Reader reader = HFile.createReader(fs, storeFile, cacheConf, true, conf);
long startTime = System.currentTimeMillis();

// Wait for 20 seconds, no thread should start prefetch
Thread.sleep(20000);
assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted());
while (!reader.prefetchStarted()) {
assertTrue("Prefetch delay has not been expired yet",
getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay());
}

// Prefech threads started working but not completed yet
assertFalse(reader.prefetchComplete());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we testing this? IMO, it's irrelevant for the feature being implemented here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is added to cover cases where prefetch is completed very fast. If this check fails then it would give clear idea during investigation. Also, this assertion will ensure the subsequent assert is tested after prefetch started and before it completes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are actually introducing flakeyness here, which we don't want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will remove this assert.


// In prefetch executor, we further compute passed in delay using variation and a random
// multiplier to get 'effective delay'. Hence, in the test, for delay of 25000 milli-secs
// check that prefetch is started after 20000 milli-sec and prefetch started after that.
// However, prefetch should not start after configured delay.
if (reader.prefetchStarted()) {
LOG.info("elapsed time {}, Delay {}", getElapsedTime(startTime),
PrefetchExecutor.getPrefetchDelay());
assertTrue("Prefetch should start post configured delay",
getElapsedTime(startTime) <= PrefetchExecutor.getPrefetchDelay());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really confusing. Why prefetch is supposed to start before the passed delay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In PrefetchExecutor class, passed in dealy is modified due to further computation.

 if (prefetchDelayMillis > 0) {
    delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2)))
      + (prefetchDelayMillis * (prefetchDelayVariation / 2)
        * ThreadLocalRandom.current().nextFloat()));

Prefetch delay variation and a random seed is used for this computation. This reduces the "effective delay" compared to passed in delay. The same is passed to schedule().

(Due to this, in one of the earlier patch sets, I had introduced new class variable to be able to compare against "effective delay")

Hence, in the test,

  1. set the delay to 25000
  2. ensurre that prefetch is not started till 20000
  3. In the window where prefetch is started and it's not complete, test that elapsed time is less than passed in prefetch delay

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then just set the hbase.hfile.prefetch.delay.variation to 0 on your test. This way, you will have a deterministic delay of 25s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already tried that. If the hbase.hfile.prefetch.delay.variation is set to 0 in the test, it does not take effect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the formula below, if you set it to 0, it should pick exactly the value of hbase.hfile.prefetch.delay. So either your test is not setting the config properly, or the dynamic config logic isn't working properly.

delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2))) + (prefetchDelayMillis * (prefetchDelayVariation / 2) * ThreadLocalRandom.current().nextFloat()));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not also make it dynamic? Couldn't you just also set it in the PrefetchExecutor.loadConfiguration?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a good idea.

Copy link
Contributor Author

@kabhishek4 kabhishek4 Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For our initial requirement, making prefech delay alone dynamic is sufficient.

prefetchDelayVariation can be updated in the loadConfiguration but it will be useful for testing only, atleast for now. Trying it out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, this is needed in certain cases where the user is expecting the exact delay before prefetch starts. With the delay variation, this delay is not fixed anymore which means the prefetch can trigger earlier or later depending on the calculated variance. Hence, IMO, this is not only a test only change, but it has its own merit. My 2 cents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree but this property is not externalised to the user, probably can be provided as a workaround in such cases. As mentioned above, I am trying out the change.


conf.setInt(PREFETCH_DELAY, 1000);
prefetchExecutorNotifier.onConfigurationChange(conf);
}

@Test
public void testPrefetchDoesntSkipHFileLink() throws Exception {
testPrefetchWhenHFileLink(c -> {
Expand Down Expand Up @@ -490,4 +547,7 @@ public static KeyValue.Type generateKeyType(Random rand) {
}
}

private long getElapsedTime(long startTime) {
return System.currentTimeMillis() - startTime;
}
}