Skip to content

Commit

Permalink
HBASE-29090: Add server-side load metrics to client results
Browse files Browse the repository at this point in the history
  • Loading branch information
Hernan Gelaf-Romer committed Jan 22, 2025
1 parent 6f8db78 commit def248c
Show file tree
Hide file tree
Showing 13 changed files with 314 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public Get(Get get) {
this.setFilter(get.getFilter());
this.setReplicaId(get.getReplicaId());
this.setConsistency(get.getConsistency());
this.setQueryMetricsEnabled(get.isQueryMetricsEnabled());
// from Get
this.cacheBlocks = get.getCacheBlocks();
this.maxVersions = get.getMaxVersions();
Expand Down Expand Up @@ -453,6 +454,7 @@ public Map<String, Object> toMap(int maxCols) {
map.put("colFamTimeRangeMap", colFamTimeRangeMapStr);
}
map.put("priority", getPriority());
map.put("queryMetricsEnabled", queryMetricsEnabled);
return map;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public abstract class Query extends OperationWithAttributes {
protected Consistency consistency = Consistency.STRONG;
protected Map<byte[], TimeRange> colFamTimeRangeMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
protected Boolean loadColumnFamiliesOnDemand = null;
protected boolean queryMetricsEnabled = false;

public Filter getFilter() {
return filter;
Expand Down Expand Up @@ -157,6 +158,15 @@ public Query setIsolationLevel(IsolationLevel level) {
return this;
}

public Query setQueryMetricsEnabled(boolean enabled) {
this.queryMetricsEnabled = enabled;
return this;
}

public boolean isQueryMetricsEnabled() {
return queryMetricsEnabled;
}

/**
* Returns The isolation level of this query. If no isolation level was set for this query object,
* then it returns READ_COMMITTED.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Public
public class QueryMetrics {
private final long blockBytesScanned;

public QueryMetrics(long blockBytesScanned) {
this.blockBytesScanned = blockBytesScanned;
}

public long getBlockBytesScanned() {
return blockBytesScanned;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class Result implements ExtendedCellScannable, ExtendedCellScanner {
*/
private int cellScannerIndex = INITIAL_CELLSCANNER_INDEX;
private RegionLoadStats stats;
private QueryMetrics metrics = null;

private final boolean readonly;

Expand Down Expand Up @@ -931,6 +932,11 @@ public void setStatistics(RegionLoadStats loadStats) {
this.stats = loadStats;
}

@InterfaceAudience.Private
public void setMetrics(QueryMetrics metrics) {
this.metrics = metrics;
}

/**
* Returns the associated statistics about the region from which this was returned. Can be
* <tt>null</tt> if stats are disabled.
Expand All @@ -939,6 +945,11 @@ public RegionLoadStats getStats() {
return stats;
}

/** Returns the query metrics, or {@code null} if we do not enable metrics. */
public QueryMetrics getMetrics() {
return metrics;
}

/**
* All methods modifying state of Result object must call this method to ensure that special
* purpose immutable Results can't be accidentally modified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public Scan(Scan scan) throws IOException {
setPriority(scan.getPriority());
readType = scan.getReadType();
super.setReplicaId(scan.getReplicaId());
super.setQueryMetricsEnabled(scan.isQueryMetricsEnabled());
}

/**
Expand Down Expand Up @@ -249,6 +250,7 @@ public Scan(Get get) {
this.mvccReadPoint = -1L;
setPriority(get.getPriority());
super.setReplicaId(get.getReplicaId());
super.setQueryMetricsEnabled(get.isQueryMetricsEnabled());
}

public boolean isGetScan() {
Expand Down Expand Up @@ -826,6 +828,7 @@ public Map<String, Object> toMap(int maxCols) {
map.put("colFamTimeRangeMap", colFamTimeRangeMapStr);
}
map.put("priority", getPriority());
map.put("queryMetricsEnabled", queryMetricsEnabled);
return map;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.OnlineLogRecord;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.QueryMetrics;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLoadStats;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
Expand Down Expand Up @@ -659,6 +660,7 @@ public static Get toGet(final ClientProtos.Get proto) throws IOException {
if (proto.hasLoadColumnFamiliesOnDemand()) {
get.setLoadColumnFamiliesOnDemand(proto.getLoadColumnFamiliesOnDemand());
}
get.setQueryMetricsEnabled(proto.getQueryMetricsEnabled());
return get;
}

Expand Down Expand Up @@ -1096,6 +1098,7 @@ public static ClientProtos.Scan toScan(final Scan scan) throws IOException {
if (scan.isNeedCursorResult()) {
scanBuilder.setNeedCursorResult(true);
}
scanBuilder.setQueryMetricsEnabled(scan.isQueryMetricsEnabled());
return scanBuilder.build();
}

Expand Down Expand Up @@ -1200,6 +1203,7 @@ public static Scan toScan(final ClientProtos.Scan proto) throws IOException {
if (proto.getNeedCursorResult()) {
scan.setNeedCursorResult(true);
}
scan.setQueryMetricsEnabled(proto.getQueryMetricsEnabled());
return scan;
}

Expand Down Expand Up @@ -1279,6 +1283,7 @@ public static ClientProtos.Get toGet(final Get get) throws IOException {
if (loadColumnFamiliesOnDemand != null) {
builder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand);
}
builder.setQueryMetricsEnabled(get.isQueryMetricsEnabled());
return builder.build();
}

Expand Down Expand Up @@ -1434,6 +1439,10 @@ public static ClientProtos.Result toResult(final Result result, boolean encodeTa
builder.setStale(result.isStale());
builder.setPartial(result.mayHaveMoreCellsInRow());

if (result.getMetrics() != null) {
builder.setMetrics(toQueryMetrics(result.getMetrics()));
}

return builder.build();
}

Expand Down Expand Up @@ -1463,6 +1472,9 @@ public static ClientProtos.Result toResultNoData(final Result result) {
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
builder.setAssociatedCellCount(size);
builder.setStale(result.isStale());
if (result.getMetrics() != null) {
builder.setMetrics(toQueryMetrics(result.getMetrics()));
}
return builder.build();
}

Expand Down Expand Up @@ -1503,7 +1515,11 @@ public static Result toResult(final ClientProtos.Result proto, boolean decodeTag
for (CellProtos.Cell c : values) {
cells.add(toCell(builder, c, decodeTags));
}
return Result.create(cells, null, proto.getStale(), proto.getPartial());
Result r = Result.create(cells, null, proto.getStale(), proto.getPartial());
if (proto.hasMetrics()) {
r.setMetrics(toQueryMetrics(proto.getMetrics()));
}
return r;
}

/**
Expand Down Expand Up @@ -1548,9 +1564,15 @@ public static Result toResult(final ClientProtos.Result proto, final CellScanner
}
}

return (cells == null || cells.isEmpty())
Result r = (cells == null || cells.isEmpty())
? (proto.getStale() ? EMPTY_RESULT_STALE : EMPTY_RESULT)
: Result.create(cells, null, proto.getStale());

if (proto.hasMetrics()) {
r.setMetrics(toQueryMetrics(proto.getMetrics()));
}

return r;
}

/**
Expand Down Expand Up @@ -3811,6 +3833,15 @@ public static ClusterStatusProtos.ServerTask toServerTask(ServerTask task) {
.setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTime()).build();
}

public static ClientProtos.QueryMetrics toQueryMetrics(QueryMetrics metrics) {
return ClientProtos.QueryMetrics.newBuilder()
.setBlockBytesScanned(metrics.getBlockBytesScanned()).build();
}

public static QueryMetrics toQueryMetrics(ClientProtos.QueryMetrics metrics) {
return new QueryMetrics(metrics.getBlockBytesScanned());
}

/**
* Check whether this IPBE indicates EOF or not.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.QueryMetrics;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.SingleResponse;
Expand Down Expand Up @@ -417,6 +418,7 @@ public static Result[] getResults(CellScanner cellScanner, ScanResponse response
int noOfResults =
cellScanner != null ? response.getCellsPerResultCount() : response.getResultsCount();
Result[] results = new Result[noOfResults];
List<ClientProtos.QueryMetrics> queryMetrics = response.getQueryMetricsList();
for (int i = 0; i < noOfResults; i++) {
if (cellScanner != null) {
// Cells are out in cellblocks. Group them up again as Results. How many to read at a
Expand Down Expand Up @@ -453,6 +455,12 @@ public static Result[] getResults(CellScanner cellScanner, ScanResponse response
// Result is pure pb.
results[i] = ProtobufUtil.toResult(response.getResults(i));
}

// Populate result metrics if they exist
if (queryMetrics.size() > i) {
QueryMetrics metrics = ProtobufUtil.toQueryMetrics(queryMetrics.get(i));
results[i].setMetrics(metrics);
}
}
return results;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public void TestGetRowFromGetCopyConstructor() throws Exception {
get.setMaxResultsPerColumnFamily(10);
get.setRowOffsetPerColumnFamily(11);
get.setCacheBlocks(true);
get.setQueryMetricsEnabled(true);

Get copyGet = new Get(get);
assertEquals(0, Bytes.compareTo(get.getRow(), copyGet.getRow()));
Expand All @@ -196,6 +197,7 @@ public void TestGetRowFromGetCopyConstructor() throws Exception {
assertEquals(get.getConsistency(), copyGet.getConsistency());
assertEquals(get.getReplicaId(), copyGet.getReplicaId());
assertEquals(get.getIsolationLevel(), copyGet.getIsolationLevel());
assertTrue(get.isQueryMetricsEnabled());

// from Get class
assertEquals(get.isCheckExistenceOnly(), copyGet.isCheckExistenceOnly());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public void testGetToScan() throws Exception {
.setAttribute("att_v0", Bytes.toBytes("att_v0"))
.setColumnFamilyTimeRange(Bytes.toBytes("cf"), 0, 123).setReplicaId(3)
.setACL("test_user", new Permission(Permission.Action.READ))
.setAuthorizations(new Authorizations("test_label")).setPriority(3);
.setAuthorizations(new Authorizations("test_label")).setQueryMetricsEnabled(true)
.setPriority(3);

Scan scan = new Scan(get);
assertEquals(get.getCacheBlocks(), scan.getCacheBlocks());
Expand All @@ -100,6 +101,7 @@ public void testGetToScan() throws Exception {
assertEquals(get.getACL(), scan.getACL());
assertEquals(get.getAuthorizations().getLabels(), scan.getAuthorizations().getLabels());
assertEquals(get.getPriority(), scan.getPriority());
assertEquals(get.isQueryMetricsEnabled(), scan.isQueryMetricsEnabled());
}

@Test
Expand Down Expand Up @@ -216,7 +218,7 @@ public void testScanCopyConstructor() throws Exception {
.setReplicaId(3).setReversed(true).setRowOffsetPerColumnFamily(5)
.setStartStopRowForPrefixScan(Bytes.toBytes("row_")).setScanMetricsEnabled(true)
.setReadType(ReadType.STREAM).withStartRow(Bytes.toBytes("row_1"))
.withStopRow(Bytes.toBytes("row_2")).setTimeRange(0, 13);
.withStopRow(Bytes.toBytes("row_2")).setTimeRange(0, 13).setQueryMetricsEnabled(true);

// create a copy of existing scan object
Scan scanCopy = new Scan(scan);
Expand Down Expand Up @@ -252,6 +254,7 @@ public void testScanCopyConstructor() throws Exception {
assertEquals(scan.getStartRow(), scanCopy.getStartRow());
assertEquals(scan.getStopRow(), scanCopy.getStopRow());
assertEquals(scan.getTimeRange(), scanCopy.getTimeRange());
assertEquals(scan.isQueryMetricsEnabled(), scanCopy.isQueryMetricsEnabled());

assertTrue("Make sure copy constructor adds all the fields in the copied object",
EqualsBuilder.reflectionEquals(scan, scanCopy));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public void testGet() throws IOException {
getBuilder.setMaxVersions(1);
getBuilder.setCacheBlocks(true);
getBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime()));
getBuilder.setQueryMetricsEnabled(false);
Get get = ProtobufUtil.toGet(proto);
assertEquals(getBuilder.build(), ProtobufUtil.toGet(get));
}
Expand Down Expand Up @@ -260,6 +261,7 @@ public void testScan() throws IOException {
scanBuilder.setCaching(1024);
scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime()));
scanBuilder.setIncludeStopRow(false);
scanBuilder.setQueryMetricsEnabled(false);
ClientProtos.Scan expectedProto = scanBuilder.build();

ClientProtos.Scan actualProto = ProtobufUtil.toScan(ProtobufUtil.toScan(expectedProto));
Expand Down
15 changes: 15 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/client/Client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ message Get {
optional Consistency consistency = 12 [default = STRONG];
repeated ColumnFamilyTimeRange cf_time_range = 13;
optional bool load_column_families_on_demand = 14; /* DO NOT add defaults to load_column_families_on_demand. */
optional bool query_metrics_enabled = 15 [default = false];
}

message Result {
Expand Down Expand Up @@ -117,6 +118,9 @@ message Result {
// to form a complete result. The equivalent flag in o.a.h.h.client.Result is
// mayHaveMoreCellsInRow.
optional bool partial = 5 [default = false];

// Server side metrics about the result
optional QueryMetrics metrics = 6;
}

/**
Expand Down Expand Up @@ -274,6 +278,7 @@ message Scan {
}
optional ReadType readType = 23 [default = DEFAULT];
optional bool need_cursor_result = 24 [default = false];
optional bool query_metrics_enabled = 25 [default = false];
}

/**
Expand Down Expand Up @@ -366,6 +371,9 @@ message ScanResponse {
// If the Scan need cursor, return the row key we are scanning in heartbeat message.
// If the Scan doesn't need a cursor, don't set this field to reduce network IO.
optional Cursor cursor = 12;

// List of QueryMetrics that maps 1:1 to the results in the response based on index
repeated QueryMetrics query_metrics = 13;
}

/**
Expand Down Expand Up @@ -458,6 +466,13 @@ message RegionAction {
optional Condition condition = 4;
}

/*
* Statistics about the Result's server-side metrics
*/
message QueryMetrics {
optional uint64 block_bytes_scanned = 1;
}

/*
* Statistics about the current load on the region
*/
Expand Down
Loading

0 comments on commit def248c

Please sign in to comment.