Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3004,6 +3004,19 @@ public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte
}

public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName,
ServerName destinationServer, long closeProcId) {
return ProtobufUtil.getBuilder(server, regionName, destinationServer, closeProcId).build();
}

public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName,
ServerName destinationServer, long closeProcId, boolean evictCache) {
CloseRegionRequest.Builder builder =
getBuilder(server, regionName, destinationServer, closeProcId);
builder.setEvictCache(evictCache);
return builder.build();
}

public static CloseRegionRequest.Builder getBuilder(ServerName server, byte[] regionName,
ServerName destinationServer, long closeProcId) {
CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
RegionSpecifier region =
Expand All @@ -3016,7 +3029,7 @@ public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte
builder.setServerStartCode(server.getStartcode());
}
builder.setCloseProcId(closeProcId);
return builder.build();
return builder;
}

public static ProcedureDescription buildProcedureDescription(String signature, String instance,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ message CloseRegionRequest {
// the intended server for this RPC.
optional uint64 serverStartCode = 5;
optional int64 close_proc_id = 6 [default = -1];
optional bool evict_cache = 7 [default = false];
}

message CloseRegionResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig c
Configuration conf) throws IOException {
super(context, fileInfo, cacheConf, conf);
// Prefetch file blocks upon open if requested
if (cacheConf.shouldPrefetchOnOpen()) {
if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()) {
PrefetchExecutor.request(path, new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -97,7 +97,7 @@ public void close(boolean evictOnClose) throws IOException {
if (evictOnClose) {
int numEvicted = cache.evictBlocksByHfileName(name);
if (LOG.isTraceEnabled()) {
LOG.trace("On close, file=" + name + " evicted=" + numEvicted + " block(s)");
LOG.trace("On close, file= {} evicted= {} block(s)", name, numEvicted);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.BLOCK_CACHE_KEY_KEY;

import io.opentelemetry.api.common.Attributes;
Expand All @@ -40,12 +41,14 @@
import org.apache.hadoop.hbase.SizeCachedKeyValue;
import org.apache.hadoop.hbase.SizeCachedNoTagsByteBufferKeyValue;
import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdLock;
Expand Down Expand Up @@ -1264,6 +1267,8 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
new BlockCacheKey(name, dataBlockOffset, this.isPrimaryReplicaReader(), expectedBlockType);
Attributes attributes = Attributes.of(BLOCK_CACHE_KEY_KEY, cacheKey.toString());

boolean cacheable = cacheBlock && cacheIfCompactionsOff();

boolean useLock = false;
IdLock.Entry lockEntry = null;
final Span span = Span.current();
Expand Down Expand Up @@ -1305,7 +1310,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
return cachedBlock;
}

if (!useLock && cacheBlock && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
if (!useLock && cacheable && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
// check cache again with lock
useLock = true;
continue;
Expand All @@ -1324,10 +1329,10 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo

// Don't need the unpacked block back and we're storing the block in the cache compressed
if (cacheOnly && cacheCompressed && cacheOnRead) {
LOG.debug("Skipping decompression of block in prefetch");
LOG.debug("Skipping decompression of block {} in prefetch", cacheKey);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly);
}
});
Expand All @@ -1340,7 +1345,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
// Using the wait on cache during compaction and prefetching.
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
cacheConf.isInMemory(), cacheOnly);
Expand Down Expand Up @@ -1667,4 +1672,9 @@ public int getMajorVersion() {
public void unbufferStream() {
fsBlockReader.unbufferStream();
}

protected boolean cacheIfCompactionsOff() {
return (!StoreFileInfo.isReference(name) && !HFileLink.isHFileLink(name))
|| !conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ static TransitRegionStateProcedure[] createUnassignProceduresForSplitOrMerge(
for (; i < procs.length; i++) {
RegionStateNode regionNode = regionNodes.get(i);
TransitRegionStateProcedure proc =
TransitRegionStateProcedure.unassign(env, regionNode.getRegionInfo());
TransitRegionStateProcedure.unassignSplitMerge(env, regionNode.getRegionInfo());
if (regionNode.getProcedure() != null) {
throw new HBaseIOException(
"The parent region " + regionNode + " is currently in transition, give up");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.assignment;

import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
Expand Down Expand Up @@ -45,14 +46,17 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
// wrong(but do not make it wrong intentionally). The client can handle this error.
private ServerName assignCandidate;

private Optional<Boolean> evictCache;

public CloseRegionProcedure() {
super();
}

public CloseRegionProcedure(TransitRegionStateProcedure parent, RegionInfo region,
ServerName targetServer, ServerName assignCandidate) {
ServerName targetServer, ServerName assignCandidate, Optional<Boolean> evictCache) {
super(parent, region, targetServer);
this.assignCandidate = assignCandidate;
this.evictCache = evictCache;
}

@Override
Expand All @@ -62,7 +66,7 @@ public TableOperationType getTableOperationType() {

@Override
public RemoteOperation newRemoteOperation() {
return new RegionCloseOperation(this, region, getProcId(), assignCandidate);
return new RegionCloseOperation(this, region, getProcId(), assignCandidate, evictCache);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -120,6 +121,8 @@ public class TransitRegionStateProcedure

private RegionRemoteProcedureBase remoteProc;

private Optional<Boolean> evictCache;

public TransitRegionStateProcedure() {
}

Expand All @@ -145,6 +148,12 @@ private void setInitialAndLastState() {

protected TransitRegionStateProcedure(MasterProcedureEnv env, RegionInfo hri,
ServerName assignCandidate, boolean forceNewPlan, TransitionType type) {
this(env, hri, assignCandidate, forceNewPlan, type, Optional.empty());
}

protected TransitRegionStateProcedure(MasterProcedureEnv env, RegionInfo hri,
ServerName assignCandidate, boolean forceNewPlan, TransitionType type,
Optional<Boolean> evictCache) {
super(env, hri);
this.assignCandidate = assignCandidate;
this.forceNewPlan = forceNewPlan;
Expand All @@ -155,6 +164,8 @@ protected TransitRegionStateProcedure(MasterProcedureEnv env, RegionInfo hri,
if (type == TransitionType.REOPEN) {
this.assignCandidate = getRegionStateNode(env).getRegionLocation();
}

this.evictCache = evictCache;
}

@Override
Expand Down Expand Up @@ -265,7 +276,7 @@ private void closeRegion(MasterProcedureEnv env, RegionStateNode regionNode) thr
// this is the normal case
env.getAssignmentManager().regionClosing(regionNode);
addChildProcedure(new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(),
assignCandidate));
assignCandidate, evictCache));
setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED);
} else {
forceNewPlan = true;
Expand Down Expand Up @@ -586,6 +597,12 @@ public static TransitRegionStateProcedure unassign(MasterProcedureEnv env, Regio
new TransitRegionStateProcedure(env, region, null, false, TransitionType.UNASSIGN));
}

public static TransitRegionStateProcedure unassignSplitMerge(MasterProcedureEnv env,
RegionInfo region) {
return setOwner(env, new TransitRegionStateProcedure(env, region, null, false,
TransitionType.UNASSIGN, Optional.of(true)));
}

public static TransitRegionStateProcedure reopen(MasterProcedureEnv env, RegionInfo region) {
return setOwner(env,
new TransitRegionStateProcedure(env, region, null, false, TransitionType.REOPEN));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.CallQueueTooBigException;
Expand Down Expand Up @@ -463,20 +464,27 @@ public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInf

public static class RegionCloseOperation extends RegionOperation {
private final ServerName destinationServer;
private final Optional<Boolean> evictCache;

public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
ServerName destinationServer) {
ServerName destinationServer, Optional<Boolean> evictCache) {
super(remoteProcedure, regionInfo, procId);
this.destinationServer = destinationServer;
this.evictCache = evictCache;
}

public ServerName getDestinationServer() {
return destinationServer;
}

public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
getDestinationServer(), procId);
if (evictCache.isPresent()) {
return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
getDestinationServer(), procId, evictCache.get());
} else {
return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
getDestinationServer(), procId);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ public boolean isCompactionsEnabled() {

public void setCompactionsEnabled(boolean compactionsEnabled) {
this.compactionsEnabled = compactionsEnabled;
this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION, String.valueOf(compactionsEnabled));
this.conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionsEnabled);
}

/** Returns the longCompactions thread pool executor */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2962,6 +2962,9 @@ private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
* <p>
* If a close was in progress, this new request will be ignored, and an exception thrown.
* </p>
* <p>
* Provides additional flag to indicate if this region blocks should be evicted from the cache.
* </p>
* @param encodedName Region to close
* @param abort True if we are aborting
* @param destination Where the Region is being moved too... maybe null if unknown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3862,9 +3862,10 @@ private void executeCloseRegionProcedures(CloseRegionRequest request) {
? ProtobufUtil.toServerName(request.getDestinationServer())
: null;
long procId = request.getCloseProcId();
boolean evictCache = request.getEvictCache();
if (server.submitRegionProcedure(procId)) {
server.getExecutorService()
.submit(UnassignRegionHandler.create(server, encodedName, procId, false, destination));
server.getExecutorService().submit(
UnassignRegionHandler.create(server, encodedName, procId, false, destination, evictCache));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver.handler;

import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.HConstants;
Expand All @@ -27,6 +29,7 @@
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
Expand Down Expand Up @@ -277,6 +280,10 @@ Throwable getException() {
/** Returns Instance of HRegion if successful open else null. */
private HRegion openRegion() {
HRegion region = null;
boolean compactionEnabled =
((HRegionServer) server).getCompactSplitThread().isCompactionsEnabled();
this.server.getConfiguration().setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the global configuration here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to prefetch blocks for links/refs only if compaction is disabled, but the prefetch happens in the HFileReader, where we don't access to the CompactSplitThread, so saving the state of compaction switch in the config for HFileReader deciding if prefetch should go ahead.

compactionEnabled);
try {
// Instantiate the region. This also periodically tickles OPENING
// state so master doesn't timeout this region in transition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,22 @@ public class UnassignRegionHandler extends EventHandler {

private final RetryCounter retryCounter;

private boolean evictCache;

public UnassignRegionHandler(HRegionServer server, String encodedName, long closeProcId,
boolean abort, @Nullable ServerName destination, EventType eventType) {
this(server, encodedName, closeProcId, abort, destination, eventType, false);
}

public UnassignRegionHandler(HRegionServer server, String encodedName, long closeProcId,
boolean abort, @Nullable ServerName destination, EventType eventType, boolean evictCache) {
super(server, eventType);
this.encodedName = encodedName;
this.closeProcId = closeProcId;
this.abort = abort;
this.destination = destination;
this.retryCounter = HandlerUtil.getRetryCounter();
this.evictCache = evictCache;
}

private HRegionServer getServer() {
Expand Down Expand Up @@ -113,6 +121,12 @@ public void process() throws IOException {
// abort the RS...
region.getCoprocessorHost().preClose(abort);
}
// This should be true only in the case of splits/merges closing the parent regions, as
// there's no point on keep blocks for those region files. As hbase.rs.evictblocksonclose is
// false by default we don't bother overriding it if evictCache is false.
if (evictCache) {
region.getStores().forEach(s -> s.getCacheConfig().setEvictOnClose(true));
}
if (region.close(abort) == null) {
// XXX: Is this still possible? The old comment says about split, but now split is done at
// master side, so...
Expand Down Expand Up @@ -144,14 +158,14 @@ protected void handleException(Throwable t) {
}

public static UnassignRegionHandler create(HRegionServer server, String encodedName,
long closeProcId, boolean abort, @Nullable ServerName destination) {
long closeProcId, boolean abort, @Nullable ServerName destination, boolean evictCache) {
// Just try our best to determine whether it is for closing meta. It is not the end of the world
// if we put the handler into a wrong executor.
Region region = server.getRegion(encodedName);
EventType eventType = region != null && region.getRegionInfo().isMetaRegion()
? EventType.M_RS_CLOSE_META
: EventType.M_RS_CLOSE_REGION;
return new UnassignRegionHandler(server, encodedName, closeProcId, abort, destination,
eventType);
eventType, evictCache);
}
}
Loading