Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2615,4 +2615,13 @@ List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType, Server
* Flush master local region
*/
void flushMasterStore() throws IOException;

/**
* Clean Cache by evicting the blocks of files belonging to regions that are no longer served by
* the RegionServer.
* @param serverName ServerName
* @return A map of filename and number of blocks evicted.
* @throws IOException if a remote or network exception occurs
*/
Map<String, Integer> uncacheStaleBlocks(ServerName serverName) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1115,4 +1115,9 @@ public List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType,
public void flushMasterStore() throws IOException {
get(admin.flushMasterStore());
}

@Override
public Map<String, Integer> uncacheStaleBlocks(ServerName serverName) throws IOException {
return get(admin.uncacheStaleBlocks(serverName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1837,4 +1837,12 @@ CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, Str
* Flush master local region
*/
CompletableFuture<Void> flushMasterStore();

/**
* Clean Cache by evicting the blocks of files belonging to regions that are no longer served by
* the RegionServer.
* @param serverName ServerName
* @return A map of filename and number of blocks evicted.
*/
CompletableFuture<Map<String, Integer>> uncacheStaleBlocks(ServerName serverName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -990,4 +990,9 @@ public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNam
public CompletableFuture<Void> flushMasterStore() {
return wrap(rawAdmin.flushMasterStore());
}

@Override
public CompletableFuture<Map<String, Integer>> uncacheStaleBlocks(ServerName serverName) {
return wrap(rawAdmin.uncacheStaleBlocks(serverName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
Expand Down Expand Up @@ -4453,4 +4455,15 @@ Void> call(controller, stub, request.build(),
(s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
.call();
}

@Override
public CompletableFuture<Map<String, Integer>> uncacheStaleBlocks(ServerName serverName) {
UncacheStaleBlocksRequest.Builder request = UncacheStaleBlocksRequest.newBuilder();
return this.<Map<String, Integer>> newAdminCaller()
.action((controller, stub) -> this.<UncacheStaleBlocksRequest, UncacheStaleBlocksResponse,
Map<String, Integer>> adminCall(controller, stub, request.build(),
(s, c, req, done) -> s.uncacheStaleBlocks(c, req, done),
resp -> resp.getUncachedFilesMap()))
.serverName(serverName).call();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
Expand Down Expand Up @@ -3767,4 +3769,20 @@ public static <T extends Message> T parseDelimitedFrom(InputStream in, Parser<T>
return parser.parseFrom(bytes);
}
}

/**
* Clean Cache by evicting the blocks of files belonging to regions that are no longer served by
* the RegionServer.
*/
public static Map<String, Integer> uncacheStaleBlocks(final RpcController controller,
final AdminService.BlockingInterface admin) throws IOException {
UncacheStaleBlocksRequest request = UncacheStaleBlocksRequest.newBuilder().build();
UncacheStaleBlocksResponse response = null;
try {
response = admin.uncacheStaleBlocks(controller, request);
} catch (ServiceException se) {
throw getRemoteException(se);
}
return response.getUncachedFilesMap();
}
}
10 changes: 10 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,13 @@ message ClearSlowLogResponses {
required bool is_cleaned = 1;
}

message UncacheStaleBlocksRequest {
}

message UncacheStaleBlocksResponse {
map<string, int32> uncached_files = 1;
}

service AdminService {
rpc GetRegionInfo(GetRegionInfoRequest)
returns(GetRegionInfoResponse);
Expand Down Expand Up @@ -405,4 +412,7 @@ service AdminService {
rpc GetLogEntries(LogRequest)
returns(LogEntry);

rpc UncacheStaleBlocks(UncacheStaleBlocksRequest)
returns(UncacheStaleBlocksResponse);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.io.hfile;

import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -161,4 +164,14 @@ default Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repe
default boolean isMetaBlock(BlockType blockType) {
return blockType != null && blockType.getCategory() != BlockType.BlockCategory.DATA;
}

/**
* Clean Cache by evicting the blocks of files belonging to regions that are no longer served by
* the RegionServer.
* @param server HRegionServer
* @return A map of filename and number of blocks evicted.
*/
default Optional<Map<String, Integer>> uncacheStaleBlocks(HRegionServer server) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer here we pass an interface, for testing whether a region is available, instead of pass a HRegionServer directly. It will be easier for testing.
And just returning a Map is enough? If we do not clean any blocks, just return an empty map?

return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -400,4 +404,13 @@ public FirstLevelBlockCache getFirstLevelCache() {
public BlockCache getSecondLevelCache() {
return l2Cache;
}

@Override
public Optional<Map<String, Integer>> uncacheStaleBlocks(HRegionServer server) {
Map<String, Integer> uncachedStaleBlocksMap =
l1Cache.uncacheStaleBlocks(server).orElseGet(HashMap::new);
l2Cache.uncacheStaleBlocks(server).ifPresent(
map2 -> map2.forEach((key, value) -> uncachedStaleBlocksMap.merge(key, value, Integer::sum)));
return Optional.of(uncachedStaleBlocksMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
Expand All @@ -75,6 +77,7 @@
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
Expand Down Expand Up @@ -2002,4 +2005,27 @@ public void fileCacheCompleted(Path filePath, long size) {
regionCachedSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + fileSize);
}

@Override
public Optional<Map<String, Integer>> uncacheStaleBlocks(HRegionServer server) {
Map<String, Integer> evictedFilesWithStaleBlocks = new HashMap<>();

fullyCachedFiles.forEach((fileName, value) -> {
int blocksEvicted;
try {
if (!server.getRegionByEncodedName(value.getFirst()).isAvailable()) {
blocksEvicted = evictBlocksByHfileName(fileName);
} else {
blocksEvicted = 0;
}
} catch (NotServingRegionException nsre) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the normal path? In the above if condition, we test isAvailable, so why here we could still get a NotServingRegionException?

LOG.debug(
"Evicting blocks for file {} as the region {} is not served by the Region Server {} anymore.",
fileName, fullyCachedFiles.get(fileName).getFirst(),
server.getServerName().getServerName());
blocksEvicted = evictBlocksByHfileName(fileName);
}
evictedFilesWithStaleBlocks.put(fileName, blocksEvicted);
});
return Optional.of(evictedFilesWithStaleBlocks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
Expand Down Expand Up @@ -3609,4 +3611,10 @@ public FlushTableResponse flushTable(RpcController controller, FlushTableRequest
throw new ServiceException(ioe);
}
}

@Override
public UncacheStaleBlocksResponse uncacheStaleBlocks(RpcController controller,
UncacheStaleBlocksRequest request) throws ServiceException {
throw new ServiceException(new DoNotRetryIOException("Unsupported method on master"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
Expand Down Expand Up @@ -3933,4 +3935,15 @@ public void onConfigurationChange(Configuration conf) {
super.onConfigurationChange(conf);
setReloadableGuardrails(conf);
}

@Override
public UncacheStaleBlocksResponse uncacheStaleBlocks(RpcController controller,
UncacheStaleBlocksRequest request) throws ServiceException {
UncacheStaleBlocksResponse.Builder responseBuilder = UncacheStaleBlocksResponse.newBuilder();
Map<String, Integer> evictedFilesWithStaleBlocks = new HashMap<>();
server.getBlockCache().flatMap(bc -> bc.uncacheStaleBlocks(server))
.ifPresent(evictedFilesWithStaleBlocks::putAll);
responseBuilder.putAllUncachedFiles(evictedFilesWithStaleBlocks);
return responseBuilder.build();
}
}
Loading