From b4d24235061e6c8e2ea8111eedd2a721b47d5c64 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Tue, 29 Dec 2020 14:10:14 +0800 Subject: [PATCH 1/3] HBASE-23898 Add trace support for simple apis in async client --- hbase-client/pom.xml | 10 + .../hadoop/hbase/client/AsyncConnection.java | 4 +- .../hbase/client/AsyncConnectionImpl.java | 114 ++--- .../hbase/client/AsyncRegionLocator.java | 166 ++++--- .../hadoop/hbase/client/AsyncTable.java | 41 +- .../client/AsyncTableRegionLocatorImpl.java | 18 +- .../hbase/client/ConnectionFactory.java | 55 +-- .../hbase/client/RawAsyncTableImpl.java | 360 ++++++++------- .../hadoop/hbase/ipc/AbstractRpcClient.java | 9 +- .../client/TestAsyncRegionLocatorTracing.java | 157 +++++++ .../hbase/client/TestAsyncTableTracing.java | 414 ++++++++++++++++++ .../apache/hadoop/hbase/trace/TraceUtil.java | 134 ++++++ .../apache/hadoop/hbase/ipc/CallRunner.java | 7 +- .../hadoop/hbase/ipc/ServerRpcConnection.java | 3 +- .../hadoop/hbase/ipc/AbstractTestIPC.java | 24 +- pom.xml | 3 +- 16 files changed, 1173 insertions(+), 346 deletions(-) create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index 45cdf88ff734..5907a0bbace1 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -144,6 +144,16 @@ org.jruby.joni joni + + io.opentelemetry + opentelemetry-sdk + test + + + io.opentelemetry + opentelemetry-sdk-testing + test + org.slf4j jcl-over-slf4j diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java index 0546520bbf48..8839eda802a5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java @@ -64,8 +64,8 @@ public interface AsyncConnection extends Closeable { /** * Retrieve an {@link AsyncTable} implementation for accessing a table. *

- * The returned instance will use default configs. Use {@link #getTableBuilder(TableName)} if - * you want to customize some configs. + * The returned instance will use default configs. Use {@link #getTableBuilder(TableName)} if you + * want to customize some configs. *

* This method no longer checks table existence. An exception will be thrown if the table does not * exist only when the first operation is attempted. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 83beaf1f40ba..334739af731e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -27,9 +27,10 @@ import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Scope; +import java.io.Closeable; import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -53,14 +54,16 @@ import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.ConcurrentMapUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; @@ -123,7 +126,7 @@ class AsyncConnectionImpl implements AsyncConnection { private volatile ConnectionOverAsyncConnection conn; public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, - SocketAddress localAddress, User user) { + SocketAddress localAddress, User user) { this.conf = conf; this.user = user; @@ -137,8 +140,8 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri } else { this.metrics = Optional.empty(); } - this.rpcClient = RpcClientFactory.createClient( - conf, clusterId, localAddress, metrics.orElse(null)); + this.rpcClient = + RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null)); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcTimeout = (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); @@ -161,14 +164,13 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS); } else { try { - listener = new ClusterStatusListener( - new ClusterStatusListener.DeadServerHandler() { - @Override - public void newDead(ServerName sn) { - locator.clearCache(sn); - rpcClient.cancelConnections(sn); - } - }, conf, listenerClass); + listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() { + @Override + public void newDead(ServerName sn) { + locator.clearCache(sn); + rpcClient.cancelConnections(sn); + } + }, conf, listenerClass); } catch (IOException e) { LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e); } @@ -205,24 +207,26 @@ public boolean isClosed() { @Override public void close() { - if (!closed.compareAndSet(false, true)) { - return; - } - LOG.info("Connection has been closed by {}.", Thread.currentThread().getName()); - if(LOG.isDebugEnabled()){ - logCallStack(Thread.currentThread().getStackTrace()); - } - IOUtils.closeQuietly(clusterStatusListener); - IOUtils.closeQuietly(rpcClient); - IOUtils.closeQuietly(registry); - if (choreService != null) { - choreService.shutdown(); - } - metrics.ifPresent(MetricsConnection::shutdown); - ConnectionOverAsyncConnection c = this.conn; - if (c != null) { - c.closePool(); - } + TraceUtil.trace(() -> { + if (!closed.compareAndSet(false, true)) { + return; + } + LOG.info("Connection has been closed by {}.", Thread.currentThread().getName()); + if (LOG.isDebugEnabled()) { + logCallStack(Thread.currentThread().getStackTrace()); + } + IOUtils.closeQuietly(clusterStatusListener); + IOUtils.closeQuietly(rpcClient); + IOUtils.closeQuietly(registry); + if (choreService != null) { + choreService.shutdown(); + } + metrics.ifPresent(MetricsConnection::shutdown); + ConnectionOverAsyncConnection c = this.conn; + if (c != null) { + c.closePool(); + } + }, "AsyncConnection.close"); } private void logCallStack(StackTraceElement[] stackTraceElements) { @@ -336,7 +340,7 @@ public AsyncTable build() { @Override public AsyncTableBuilder getTableBuilder(TableName tableName, - ExecutorService pool) { + ExecutorService pool) { return new AsyncTableBuilderBase(tableName, connConf) { @Override @@ -377,7 +381,7 @@ public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName @Override public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, - ExecutorService pool) { + ExecutorService pool) { return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool), RETRY_TIMER); } @@ -401,28 +405,36 @@ public Connection toConnection() { @Override public CompletableFuture getHbck() { - CompletableFuture future = new CompletableFuture<>(); - addListener(registry.getActiveMaster(), (sn, error) -> { - if (error != null) { - future.completeExceptionally(error); - } else { - try { - future.complete(getHbck(sn)); - } catch (IOException e) { - future.completeExceptionally(e); + return TraceUtil.tracedFuture(() -> { + CompletableFuture future = new CompletableFuture<>(); + addListener(registry.getActiveMaster(), (sn, error) -> { + if (error != null) { + future.completeExceptionally(error); + } else { + try { + future.complete(getHbck(sn)); + } catch (IOException e) { + future.completeExceptionally(e); + } } - } - }); - return future; + }); + return future; + }, getClass().getName() + ".getHbck"); } @Override public Hbck getHbck(ServerName masterServer) throws IOException { - // we will not create a new connection when creating a new protobuf stub, and for hbck there - // will be no performance consideration, so for simplification we will create a new stub every - // time instead of caching the stub here. - return new HBaseHbck(MasterProtos.HbckService.newBlockingStub( - rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory); + Span span = TraceUtil.createSpan(getClass().getName() + ".getHbck") + .setAttribute(TraceUtil.SERVER_NAME_KEY, masterServer.getServerName()); + try (Scope scope = span.makeCurrent()) { + // we will not create a new connection when creating a new protobuf stub, and for hbck there + // will be no performance consideration, so for simplification we will create a new stub every + // time instead of caching the stub here. + return new HBaseHbck( + MasterProtos.HbckService + .newBlockingStub(rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), + rpcControllerFactory); + } } Optional getConnectionMetrics() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index 215a1c58faa1..450c32433a99 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -18,16 +18,28 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; +import static org.apache.hadoop.hbase.trace.TraceUtil.REGION_NAMES_KEY; +import static org.apache.hadoop.hbase.trace.TraceUtil.SERVER_NAME_KEY; +import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan; +import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Supplier; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -61,7 +73,7 @@ class AsyncRegionLocator { } private CompletableFuture withTimeout(CompletableFuture future, long timeoutNs, - Supplier timeoutMsg) { + Supplier timeoutMsg) { if (future.isDone() || timeoutNs <= 0) { return future; } @@ -84,64 +96,101 @@ private boolean isMeta(TableName tableName) { return TableName.isMetaTableName(tableName); } + private CompletableFuture tracedLocationFuture(Supplier> action, + Function> getRegionNames, TableName tableName, String methodName) { + Span span = createTableSpan(getClass().getSimpleName() + "." + methodName, tableName); + try (Scope scope = span.makeCurrent()) { + CompletableFuture future = action.get(); + FutureUtils.addListener(future, (resp, error) -> { + if (error != null) { + span.recordException(error); + span.setStatus(StatusCode.ERROR); + } else { + List regionNames = getRegionNames.apply(resp); + if (!regionNames.isEmpty()) { + span.setAttribute(REGION_NAMES_KEY, regionNames); + } + span.setStatus(StatusCode.OK); + } + span.end(); + }); + return future; + } + } + + private List getRegionName(RegionLocations locs) { + List names = new ArrayList<>(); + for (HRegionLocation loc : locs.getRegionLocations()) { + if (loc != null) { + names.add(loc.getRegion().getRegionNameAsString()); + } + } + return names; + } + CompletableFuture getRegionLocations(TableName tableName, byte[] row, - RegionLocateType type, boolean reload, long timeoutNs) { - CompletableFuture future = isMeta(tableName) - ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) - : nonMetaRegionLocator.getRegionLocations(tableName, row, - RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload); - return withTimeout(future, timeoutNs, - () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + - "ms) waiting for region locations for " + tableName + ", row='" + - Bytes.toStringBinary(row) + "'"); + RegionLocateType type, boolean reload, long timeoutNs) { + return tracedLocationFuture(() -> { + CompletableFuture future = isMeta(tableName) ? + metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) : + nonMetaRegionLocator.getRegionLocations(tableName, row, + RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload); + return withTimeout(future, timeoutNs, + () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + + "ms) waiting for region locations for " + tableName + ", row='" + + Bytes.toStringBinary(row) + "'"); + }, this::getRegionName, tableName, "getRegionLocations"); } CompletableFuture getRegionLocation(TableName tableName, byte[] row, - int replicaId, RegionLocateType type, boolean reload, long timeoutNs) { - // meta region can not be split right now so we always call the same method. - // Change it later if the meta table can have more than one regions. - CompletableFuture future = new CompletableFuture<>(); - CompletableFuture locsFuture = - isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload) - : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); - addListener(locsFuture, (locs, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - HRegionLocation loc = locs.getRegionLocation(replicaId); - if (loc == null) { - future.completeExceptionally( - new RegionOfflineException("No location for " + tableName + ", row='" + - Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId)); - } else if (loc.getServerName() == null) { - future.completeExceptionally( - new RegionOfflineException("No server address listed for region '" + - loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) + - "', locateType=" + type + ", replicaId=" + replicaId)); - } else { - future.complete(loc); - } - }); - return withTimeout(future, timeoutNs, - () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + - "ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) + - "', replicaId=" + replicaId); + int replicaId, RegionLocateType type, boolean reload, long timeoutNs) { + return tracedLocationFuture(() -> { + // meta region can not be split right now so we always call the same method. + // Change it later if the meta table can have more than one regions. + CompletableFuture future = new CompletableFuture<>(); + CompletableFuture locsFuture = + isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload) : + nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); + addListener(locsFuture, (locs, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + HRegionLocation loc = locs.getRegionLocation(replicaId); + if (loc == null) { + future.completeExceptionally( + new RegionOfflineException("No location for " + tableName + ", row='" + + Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId)); + } else if (loc.getServerName() == null) { + future.completeExceptionally( + new RegionOfflineException("No server address listed for region '" + + loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) + + "', locateType=" + type + ", replicaId=" + replicaId)); + } else { + future.complete(loc); + } + }); + return withTimeout(future, timeoutNs, + () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + + "ms) waiting for region location for " + tableName + ", row='" + + Bytes.toStringBinary(row) + "', replicaId=" + replicaId); + }, loc -> Arrays.asList(loc.getRegion().getRegionNameAsString()), tableName, + "getRegionLocation"); } CompletableFuture getRegionLocation(TableName tableName, byte[] row, - int replicaId, RegionLocateType type, long timeoutNs) { + int replicaId, RegionLocateType type, long timeoutNs) { return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs); } CompletableFuture getRegionLocation(TableName tableName, byte[] row, - RegionLocateType type, boolean reload, long timeoutNs) { + RegionLocateType type, boolean reload, long timeoutNs) { return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload, timeoutNs); } CompletableFuture getRegionLocation(TableName tableName, byte[] row, - RegionLocateType type, long timeoutNs) { + RegionLocateType type, long timeoutNs) { return getRegionLocation(tableName, row, type, false, timeoutNs); } @@ -154,24 +203,31 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { } void clearCache(TableName tableName) { - LOG.debug("Clear meta cache for {}", tableName); - if (tableName.equals(META_TABLE_NAME)) { - metaRegionLocator.clearCache(); - } else { - nonMetaRegionLocator.clearCache(tableName); - } + TraceUtil.trace(() -> { + LOG.debug("Clear meta cache for {}", tableName); + if (tableName.equals(META_TABLE_NAME)) { + metaRegionLocator.clearCache(); + } else { + nonMetaRegionLocator.clearCache(tableName); + } + }, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName)); } void clearCache(ServerName serverName) { - LOG.debug("Clear meta cache for {}", serverName); - metaRegionLocator.clearCache(serverName); - nonMetaRegionLocator.clearCache(serverName); - conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer); + TraceUtil.trace(() -> { + LOG.debug("Clear meta cache for {}", serverName); + metaRegionLocator.clearCache(serverName); + nonMetaRegionLocator.clearCache(serverName); + conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer); + }, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY, + serverName.getServerName())); } void clearCache() { - metaRegionLocator.clearCache(); - nonMetaRegionLocator.clearCache(); + TraceUtil.trace(() -> { + metaRegionLocator.clearCache(); + nonMetaRegionLocator.clearCache(); + }, "AsyncRegionLocator.clearCache"); } AsyncNonMetaRegionLocator getNonMetaRegionLocator() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index b390909d3696..df25351e1017 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -70,6 +70,7 @@ public interface AsyncTable { * Gets the {@link AsyncTableRegionLocator} for this table. */ AsyncTableRegionLocator getRegionLocator(); + /** * Get timeout of each rpc request in this Table instance. It will be overridden by a more * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout. @@ -184,7 +185,7 @@ default CompletableFuture exists(Get get) { * {@link CompletableFuture}. */ default CompletableFuture incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount) { + long amount) { return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); } @@ -204,12 +205,12 @@ default CompletableFuture incrementColumnValue(byte[] row, byte[] family, * {@link CompletableFuture}. */ default CompletableFuture incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount, Durability durability) { + long amount, Durability durability) { Preconditions.checkNotNull(row, "row is null"); Preconditions.checkNotNull(family, "family is null"); return increment( new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) - .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); + .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); } /** @@ -233,16 +234,15 @@ default CompletableFuture incrementColumnValue(byte[] row, byte[] family, * * * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it - * any more. + * any more. */ @Deprecated CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family); /** * A helper class for sending checkAndMutate request. - * * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it - * any more. + * any more. */ @Deprecated interface CheckAndMutateBuilder { @@ -319,16 +319,15 @@ default CheckAndMutateBuilder ifEquals(byte[] value) { * * * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it - * any more. + * any more. */ @Deprecated CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter); /** * A helper class for sending checkAndMutate request with a filter. - * * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it - * any more. + * any more. */ @Deprecated interface CheckAndMutateWithFilterBuilder { @@ -361,9 +360,8 @@ interface CheckAndMutateWithFilterBuilder { } /** - * checkAndMutate that atomically checks if a row matches the specified condition. If it does, - * it performs the specified action. - * + * checkAndMutate that atomically checks if a row matches the specified condition. If it does, it + * performs the specified action. * @param checkAndMutate The CheckAndMutate object. * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate. */ @@ -373,22 +371,19 @@ interface CheckAndMutateWithFilterBuilder { * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed * atomically (and thus, each may fail independently of others). - * * @param checkAndMutates The list of CheckAndMutate. - * @return A list of {@link CompletableFuture}s that represent the result for each - * CheckAndMutate. + * @return A list of {@link CompletableFuture}s that represent the result for each CheckAndMutate. */ - List> checkAndMutate( - List checkAndMutates); + List> + checkAndMutate(List checkAndMutates); /** * A simple version of batch checkAndMutate. It will fail if there are any failures. - * * @param checkAndMutates The list of rows to apply. * @return A {@link CompletableFuture} that wrapper the result list. */ - default CompletableFuture> checkAndMutateAll( - List checkAndMutates) { + default CompletableFuture> + checkAndMutateAll(List checkAndMutates) { return allOf(checkAndMutate(checkAndMutates)); } @@ -484,7 +479,7 @@ default ResultScanner getScanner(byte[] family, byte[] qualifier) { */ default List> exists(List gets) { return get(toCheckExistenceOnly(gets)).stream() - .> map(f -> f.thenApply(r -> r.getExists())).collect(toList()); + .> map(f -> f.thenApply(r -> r.getExists())).collect(toList()); } /** @@ -592,7 +587,7 @@ default CompletableFuture> batchAll(List actions) { * @see ServiceCaller */ CompletableFuture coprocessorService(Function stubMaker, - ServiceCaller callable, byte[] row); + ServiceCaller callable, byte[] row); /** * The callback when we want to execute a coprocessor call on a range of regions. @@ -731,5 +726,5 @@ default CoprocessorServiceBuilder toRow(byte[] endKey) { * for more details. */ CoprocessorServiceBuilder coprocessorService(Function stubMaker, - ServiceCaller callable, CoprocessorCallback callback); + ServiceCaller callable, CoprocessorCallback callback); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java index ad6a051208f9..35bf0e0ea330 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; + import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -47,19 +49,21 @@ public TableName getName() { @Override public CompletableFuture getRegionLocation(byte[] row, int replicaId, - boolean reload) { + boolean reload) { return conn.getLocator().getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload, -1L); } @Override public CompletableFuture> getAllRegionLocations() { - if (TableName.isMetaTableName(tableName)) { - return conn.registry.getMetaRegionLocations() - .thenApply(locs -> Arrays.asList(locs.getRegionLocations())); - } - return ClientMetaTableAccessor - .getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), tableName); + return tracedFuture(() -> { + if (TableName.isMetaTableName(tableName)) { + return conn.registry.getMetaRegionLocations() + .thenApply(locs -> Arrays.asList(locs.getRegionLocations())); + } + return ClientMetaTableAccessor + .getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), tableName); + }, getClass().getSimpleName() + ".getAllRegionLocations"); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index 027b8983954d..bdf438241ca5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -277,31 +278,33 @@ public static CompletableFuture createAsyncConnection(Configura * @return AsyncConnection object wrapped by CompletableFuture */ public static CompletableFuture createAsyncConnection(Configuration conf, - final User user) { - CompletableFuture future = new CompletableFuture<>(); - ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf); - addListener(registry.getClusterId(), (clusterId, error) -> { - if (error != null) { - registry.close(); - future.completeExceptionally(error); - return; - } - if (clusterId == null) { - registry.close(); - future.completeExceptionally(new IOException("clusterid came back null")); - return; - } - Class clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL, - AsyncConnectionImpl.class, AsyncConnection.class); - try { - future.complete( - user.runAs((PrivilegedExceptionAction) () -> ReflectionUtils - .newInstance(clazz, conf, registry, clusterId, null, user))); - } catch (Exception e) { - registry.close(); - future.completeExceptionally(e); - } - }); - return future; + final User user) { + return TraceUtil.tracedFuture(() -> { + CompletableFuture future = new CompletableFuture<>(); + ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf); + addListener(registry.getClusterId(), (clusterId, error) -> { + if (error != null) { + registry.close(); + future.completeExceptionally(error); + return; + } + if (clusterId == null) { + registry.close(); + future.completeExceptionally(new IOException("clusterid came back null")); + return; + } + Class clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL, + AsyncConnectionImpl.class, AsyncConnection.class); + try { + future.complete( + user.runAs((PrivilegedExceptionAction) () -> ReflectionUtils + .newInstance(clazz, conf, registry, clusterId, null, user))); + } catch (Exception e) { + registry.close(); + future.completeExceptionally(e); + } + }); + return future; + }, ConnectionFactory.class.getSimpleName() + ".createAsyncConnection"); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 3cffad8b44d4..2c5bd167483e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; +import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; +import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; @@ -132,8 +134,8 @@ class RawAsyncTableImpl implements AsyncTable { } this.maxAttempts = builder.maxAttempts; this.startLogErrorsCnt = builder.startLogErrorsCnt; - this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() - : conn.connConf.getScannerCaching(); + this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() : + conn.connConf.getScannerCaching(); this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); } @@ -158,23 +160,23 @@ public AsyncTableRegionLocator getRegionLocator() { } private static CompletableFuture mutate(HBaseRpcController controller, - HRegionLocation loc, ClientService.Interface stub, REQ req, - Converter reqConvert, - Converter respConverter) { + HRegionLocation loc, ClientService.Interface stub, REQ req, + Converter reqConvert, + Converter respConverter) { return ConnectionUtils.call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done), respConverter); } private static CompletableFuture voidMutate(HBaseRpcController controller, - HRegionLocation loc, ClientService.Interface stub, REQ req, - Converter reqConvert) { + HRegionLocation loc, ClientService.Interface stub, REQ req, + Converter reqConvert) { return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { return null; }); } private static Result toResult(HBaseRpcController controller, MutateResponse resp) - throws IOException { + throws IOException { if (!resp.hasResult()) { return null; } @@ -187,9 +189,9 @@ private interface NoncedConverter { } private CompletableFuture noncedMutate(long nonceGroup, long nonce, - HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, - NoncedConverter reqConvert, - Converter respConverter) { + HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, + NoncedConverter reqConvert, + Converter respConverter) { return mutate(controller, loc, stub, req, (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); } @@ -202,8 +204,8 @@ private SingleRequestCallerBuilder newCaller(byte[] row, int priority, lo .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } - private SingleRequestCallerBuilder newCaller( - R row, long rpcTimeoutNs) { + private SingleRequestCallerBuilder + newCaller(R row, long rpcTimeoutNs) { return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs); } @@ -218,50 +220,62 @@ private CompletableFuture get(Get get, int replicaId) { @Override public CompletableFuture get(Get get) { - return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), - RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, - conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()); + return tracedFuture( + () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), + RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, + conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()), + "AsyncTable.get", tableName); + } + + private String getSpanName(String methodName) { + return getClass().getSimpleName() + "." + methodName; } @Override public CompletableFuture put(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); - return this. newCaller(put, writeRpcTimeoutNs) + return tracedFuture(() -> this. newCaller(put, writeRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest)) - .call(); + .call(), "AsyncTable.put", tableName); } @Override public CompletableFuture delete(Delete delete) { - return this. newCaller(delete, writeRpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, - stub, delete, RequestConverter::buildMutateRequest)) - .call(); + return tracedFuture( + () -> this. newCaller(delete, writeRpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, + stub, delete, RequestConverter::buildMutateRequest)) + .call(), + "AsyncTable.delete", tableName); } @Override public CompletableFuture append(Append append) { checkHasFamilies(append); - long nonceGroup = conn.getNonceGenerator().getNonceGroup(); - long nonce = conn.getNonceGenerator().newNonce(); - return this. newCaller(append, rpcTimeoutNs) - .action( - (controller, loc, stub) -> this. noncedMutate(nonceGroup, nonce, controller, - loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) - .call(); + return tracedFuture(() -> { + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); + return this. newCaller(append, rpcTimeoutNs) + .action((controller, loc, stub) -> this. noncedMutate(nonceGroup, nonce, + controller, loc, stub, append, RequestConverter::buildMutateRequest, + RawAsyncTableImpl::toResult)) + .call(); + }, "AsyncTable.append", tableName); } @Override public CompletableFuture increment(Increment increment) { checkHasFamilies(increment); - long nonceGroup = conn.getNonceGenerator().getNonceGroup(); - long nonce = conn.getNonceGenerator().newNonce(); - return this. newCaller(increment, rpcTimeoutNs) - .action((controller, loc, stub) -> this. noncedMutate(nonceGroup, nonce, - controller, loc, stub, increment, RequestConverter::buildMutateRequest, - RawAsyncTableImpl::toResult)) - .call(); + return tracedFuture(() -> { + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); + return this. newCaller(increment, rpcTimeoutNs) + .action((controller, loc, stub) -> this. noncedMutate(nonceGroup, nonce, + controller, loc, stub, increment, RequestConverter::buildMutateRequest, + RawAsyncTableImpl::toResult)) + .call(); + }, "AsyncTable.increment", tableName); } private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { @@ -319,37 +333,42 @@ private void preCheck() { public CompletableFuture thenPut(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); preCheck(); - return RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, - stub, put, - (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, - null, timeRange, p), - (c, r) -> r.getProcessed())) - .call(); + return tracedFuture( + () -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, + (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, + null, timeRange, p), + (c, r) -> r.getProcessed())) + .call(), + "AsyncTable.CheckAndMutateBuilder.thenPut", tableName); } @Override public CompletableFuture thenDelete(Delete delete) { preCheck(); - return RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, - loc, stub, delete, - (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, - null, timeRange, d), - (c, r) -> r.getProcessed())) - .call(); + return tracedFuture( + () -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, + (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, + null, timeRange, d), + (c, r) -> r.getProcessed())) + .call(), + "AsyncTable.CheckAndMutateBuilder.thenDelete", tableName); } @Override public CompletableFuture thenMutate(RowMutations mutation) { preCheck(); - return RawAsyncTableImpl.this. newCaller(row, mutation.getMaxPriority(), - rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, - loc, stub, mutation, - (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, - null, timeRange, rm), CheckAndMutateResult::isSuccess)) - .call(); + return tracedFuture( + () -> RawAsyncTableImpl.this + . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, + mutation, + (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, + null, timeRange, rm), + CheckAndMutateResult::isSuccess)) + .call(), + "AsyncTable.CheckAndMutateBuilder.thenMutate", tableName); } } @@ -381,35 +400,40 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { @Override public CompletableFuture thenPut(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); - return RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, - stub, put, - (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, - filter, timeRange, p), - (c, r) -> r.getProcessed())) - .call(); + return tracedFuture( + () -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, + (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, + timeRange, p), + (c, r) -> r.getProcessed())) + .call(), + "AsyncTable.CheckAndMutateWithFilterBuilder.thenPut", tableName); } @Override public CompletableFuture thenDelete(Delete delete) { - return RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, - loc, stub, delete, - (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, - filter, timeRange, d), - (c, r) -> r.getProcessed())) - .call(); + return tracedFuture( + () -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, + (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, + timeRange, d), + (c, r) -> r.getProcessed())) + .call(), + "AsyncTable.CheckAndMutateWithFilterBuilder.thenDelete", tableName); } @Override public CompletableFuture thenMutate(RowMutations mutation) { - return RawAsyncTableImpl.this. newCaller(row, mutation.getMaxPriority(), - rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, - loc, stub, mutation, - (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, - filter, timeRange, rm), CheckAndMutateResult::isSuccess)) - .call(); + return tracedFuture( + () -> RawAsyncTableImpl.this + . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, + mutation, + (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, + timeRange, rm), + CheckAndMutateResult::isSuccess)) + .call(), + "AsyncTable.CheckAndMutateWithFilterBuilder.thenMutate", tableName); } } @@ -423,58 +447,64 @@ public CompletableFuture checkAndMutate(CheckAndMutate che if (checkAndMutate.getAction() instanceof Put) { validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize()); } - if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete - || checkAndMutate.getAction() instanceof Increment - || checkAndMutate.getAction() instanceof Append) { - Mutation mutation = (Mutation) checkAndMutate.getAction(); - if (mutation instanceof Put) { - validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize()); + return tracedFuture(() -> { + if (checkAndMutate.getAction() instanceof Put || + checkAndMutate.getAction() instanceof Delete || + checkAndMutate.getAction() instanceof Increment || + checkAndMutate.getAction() instanceof Append) { + Mutation mutation = (Mutation) checkAndMutate.getAction(); + if (mutation instanceof Put) { + validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize()); + } + return RawAsyncTableImpl.this + . newCaller(checkAndMutate.getRow(), mutation.getPriority(), + rpcTimeoutNs) + .action( + (controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation, + (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), + checkAndMutate.getFamily(), checkAndMutate.getQualifier(), + checkAndMutate.getCompareOp(), checkAndMutate.getValue(), + checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m), + (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner()))) + .call(); + } else if (checkAndMutate.getAction() instanceof RowMutations) { + RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); + return RawAsyncTableImpl.this + . newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(), + rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.this + . mutateRow(controller, loc, stub, + rowMutations, + (rn, rm) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), + checkAndMutate.getFamily(), checkAndMutate.getQualifier(), + checkAndMutate.getCompareOp(), checkAndMutate.getValue(), + checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm), + resp -> resp)) + .call(); + } else { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new DoNotRetryIOException( + "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName())); + return future; } - return RawAsyncTableImpl.this. newCaller(checkAndMutate.getRow(), - mutation.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, - loc, stub, mutation, - (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), - checkAndMutate.getFamily(), checkAndMutate.getQualifier(), - checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(), - checkAndMutate.getTimeRange(), m), - (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner()))) - .call(); - } else if (checkAndMutate.getAction() instanceof RowMutations) { - RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); - return RawAsyncTableImpl.this. newCaller(checkAndMutate.getRow(), - rowMutations.getMaxPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> - RawAsyncTableImpl.this. mutateRow( - controller, loc, stub, rowMutations, - (rn, rm) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), - checkAndMutate.getFamily(), checkAndMutate.getQualifier(), - checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(), - checkAndMutate.getTimeRange(), rm), - resp -> resp)) - .call(); - } else { - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(new DoNotRetryIOException( - "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName())); - return future; - } + }, "AsyncTable.checkAndMutate", tableName); } @Override - public List> checkAndMutate( - List checkAndMutates) { - return batch(checkAndMutates, rpcTimeoutNs).stream() - .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()); + public List> + checkAndMutate(List checkAndMutates) { + return tracedFutures( + () -> batch(checkAndMutates, rpcTimeoutNs).stream() + .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), + "AsyncTable.checkAndMutateList", tableName); } // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, // so here I write a new method as I do not want to change the abstraction of call method. @SuppressWarnings("unchecked") private CompletableFuture mutateRow(HBaseRpcController controller, - HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, - Converter reqConvert, - Function respConverter) { + HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, + Converter reqConvert, Function respConverter) { CompletableFuture future = new CompletableFuture<>(); try { byte[] regionName = loc.getRegion().getRegionName(); @@ -493,12 +523,12 @@ public void run(MultiResponse resp) { loc.getServerName(), multiResp); Throwable ex = multiResp.getException(regionName); if (ex != null) { - future.completeExceptionally(ex instanceof IOException ? ex - : new IOException( + future.completeExceptionally(ex instanceof IOException ? ex : + new IOException( "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); } else { - future.complete(respConverter - .apply((RES) multiResp.getResults().get(regionName).result.get(0))); + future.complete( + respConverter.apply((RES) multiResp.getResults().get(regionName).result.get(0))); } } catch (IOException e) { future.completeExceptionally(e); @@ -514,17 +544,15 @@ public void run(MultiResponse resp) { @Override public CompletableFuture mutateRow(RowMutations mutations) { - return this. newCaller(mutations.getRow(), mutations.getMaxPriority(), - writeRpcTimeoutNs).action((controller, loc, stub) -> - this. mutateRow(controller, loc, stub, mutations, - (rn, rm) -> { - RegionAction.Builder regionMutationBuilder = RequestConverter - .buildRegionAction(rn, rm); - regionMutationBuilder.setAtomic(true); - return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()) - .build(); - }, resp -> resp)) - .call(); + return tracedFuture(() -> this + . newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs) + .action((controller, loc, stub) -> this. mutateRow(controller, loc, stub, + mutations, (rn, rm) -> { + RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); + regionMutationBuilder.setAtomic(true); + return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); + }, resp -> resp)) + .call(), "AsyncTable.mutateRow", tableName); } private Scan setDefaultScanConfig(Scan scan) { @@ -560,46 +588,48 @@ public ResultScanner getScanner(Scan scan) { @Override public CompletableFuture> scanAll(Scan scan) { - CompletableFuture> future = new CompletableFuture<>(); - List scanResults = new ArrayList<>(); - scan(scan, new AdvancedScanResultConsumer() { + return tracedFuture(() -> { + CompletableFuture> future = new CompletableFuture<>(); + List scanResults = new ArrayList<>(); + scan(scan, new AdvancedScanResultConsumer() { - @Override - public void onNext(Result[] results, ScanController controller) { - scanResults.addAll(Arrays.asList(results)); - } + @Override + public void onNext(Result[] results, ScanController controller) { + scanResults.addAll(Arrays.asList(results)); + } - @Override - public void onError(Throwable error) { - future.completeExceptionally(error); - } + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } - @Override - public void onComplete() { - future.complete(scanResults); - } - }); - return future; + @Override + public void onComplete() { + future.complete(scanResults); + } + }); + return future; + }, "AsyncTable.scanAll", tableName); } @Override public List> get(List gets) { - return batch(gets, readRpcTimeoutNs); + return tracedFutures(() -> batch(gets, readRpcTimeoutNs), "AsyncTable.getList", tableName); } @Override public List> put(List puts) { - return voidMutate(puts); + return tracedFutures(() -> voidMutate(puts), "AsyncTable.putList", tableName); } @Override public List> delete(List deletes) { - return voidMutate(deletes); + return tracedFutures(() -> voidMutate(deletes), "AsyncTable.deleteList", tableName); } @Override public List> batch(List actions) { - return batch(actions, rpcTimeoutNs); + return tracedFutures(() -> batch(actions, rpcTimeoutNs), "AsyncTable.batch", tableName); } private List> voidMutate(List actions) { @@ -651,7 +681,7 @@ public long getScanTimeout(TimeUnit unit) { } private CompletableFuture coprocessorService(Function stubMaker, - ServiceCaller callable, RegionInfo region, byte[] row) { + ServiceCaller callable, RegionInfo region, byte[] row) { RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, region, row, rpcTimeoutNs, operationTimeoutNs); S stub = stubMaker.apply(channel); @@ -669,7 +699,7 @@ private CompletableFuture coprocessorService(Function s @Override public CompletableFuture coprocessorService(Function stubMaker, - ServiceCaller callable, byte[] row) { + ServiceCaller callable, byte[] row) { return coprocessorService(stubMaker, callable, null, row); } @@ -691,9 +721,9 @@ private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyI } private void onLocateComplete(Function stubMaker, - ServiceCaller callable, CoprocessorCallback callback, List locs, - byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, - AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { + ServiceCaller callable, CoprocessorCallback callback, List locs, + byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, + AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { if (error != null) { callback.onError(error); return; @@ -722,7 +752,7 @@ private void onLocateComplete(Function stubMaker, } private final class CoprocessorServiceBuilderImpl - implements CoprocessorServiceBuilder { + implements CoprocessorServiceBuilder { private final Function stubMaker; @@ -739,7 +769,7 @@ private final class CoprocessorServiceBuilderImpl private boolean endKeyInclusive; public CoprocessorServiceBuilderImpl(Function stubMaker, - ServiceCaller callable, CoprocessorCallback callback) { + ServiceCaller callable, CoprocessorCallback callback) { this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null"); this.callable = Preconditions.checkNotNull(callable, "callable is null"); this.callback = Preconditions.checkNotNull(callback, "callback is null"); @@ -776,8 +806,8 @@ public void execute() { @Override public CoprocessorServiceBuilder coprocessorService( - Function stubMaker, ServiceCaller callable, - CoprocessorCallback callback) { + Function stubMaker, ServiceCaller callable, + CoprocessorCallback callback) { return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 3acc6c1a65bb..08445aa8d405 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -393,10 +393,11 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr, } private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, - final Message param, Message returnType, final User ticket, - final Address addr, final RpcCallback callback) { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + md.getFullName()) - .startSpan(); + final Message param, Message returnType, final User ticket, final Address addr, + final RpcCallback callback) { + Span span = TraceUtil.createSpan("RpcClient.callMethod") + .setAttribute(TraceUtil.RPC_SERVICE_KEY, md.getService().getName()) + .setAttribute(TraceUtil.RPC_METHOD_KEY, md.getName()); try (Scope scope = span.makeCurrent()) { final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); cs.setStartTime(EnvironmentEdgeManager.currentTime()); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java new file mode 100644 index 000000000000..b629bbfb977b --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncRegionLocatorTracing { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncRegionLocatorTracing.class); + + private static Configuration CONF = HBaseConfiguration.create(); + + private AsyncConnectionImpl conn; + + private RegionLocations locs; + + @Rule + public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); + + @Before + public void setUp() throws IOException { + RegionInfo metaRegionInfo = RegionInfoBuilder.newBuilder(TableName.META_TABLE_NAME).build(); + locs = new RegionLocations( + new HRegionLocation(metaRegionInfo, + ServerName.valueOf("127.0.0.1", 12345, System.currentTimeMillis())), + new HRegionLocation(RegionReplicaUtil.getRegionInfoForReplica(metaRegionInfo, 1), + ServerName.valueOf("127.0.0.2", 12345, System.currentTimeMillis())), + new HRegionLocation(RegionReplicaUtil.getRegionInfoForReplica(metaRegionInfo, 2), + ServerName.valueOf("127.0.0.3", 12345, System.currentTimeMillis()))); + conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF) { + + @Override + public CompletableFuture getMetaRegionLocations() { + return CompletableFuture.completedFuture(locs); + } + }, "test", null, UserProvider.instantiate(CONF).getCurrent()); + } + + @After + public void tearDown() throws IOException { + Closeables.close(conn, true); + } + + private SpanData waitSpan(String name) { + Waiter.waitFor(CONF, 1000, + () -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name))); + return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get(); + } + + @Test + public void testClearCache() { + conn.getLocator().clearCache(); + SpanData span = waitSpan("AsyncRegionLocator.clearCache"); + assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); + } + + @Test + public void testClearCacheServerName() { + ServerName sn = ServerName.valueOf("127.0.0.1", 12345, System.currentTimeMillis()); + conn.getLocator().clearCache(sn); + SpanData span = waitSpan("AsyncRegionLocator.clearCache"); + assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); + assertEquals(sn.toString(), span.getAttributes().get(TraceUtil.SERVER_NAME_KEY)); + } + + @Test + public void testClearCacheTableName() { + conn.getLocator().clearCache(TableName.META_TABLE_NAME); + SpanData span = waitSpan("AsyncRegionLocator.clearCache"); + assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); + assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(), + span.getAttributes().get(TraceUtil.NAMESPACE_KEY)); + assertEquals(TableName.META_TABLE_NAME.getNameAsString(), + span.getAttributes().get(TraceUtil.TABLE_KEY)); + } + + @Test + public void testGetRegionLocation() { + conn.getLocator().getRegionLocation(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, + RegionLocateType.CURRENT, TimeUnit.SECONDS.toNanos(1)).join(); + SpanData span = waitSpan("AsyncRegionLocator.getRegionLocation"); + assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); + assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(), + span.getAttributes().get(TraceUtil.NAMESPACE_KEY)); + assertEquals(TableName.META_TABLE_NAME.getNameAsString(), + span.getAttributes().get(TraceUtil.TABLE_KEY)); + List regionNames = span.getAttributes().get(TraceUtil.REGION_NAMES_KEY); + assertEquals(1, regionNames.size()); + assertEquals(locs.getDefaultRegionLocation().getRegion().getRegionNameAsString(), + regionNames.get(0)); + } + + @Test + public void testGetRegionLocations() { + conn.getLocator().getRegionLocations(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, + RegionLocateType.CURRENT, false, TimeUnit.SECONDS.toNanos(1)).join(); + SpanData span = waitSpan("AsyncRegionLocator.getRegionLocations"); + assertEquals(StatusCode.OK, span.getStatus().getStatusCode()); + assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(), + span.getAttributes().get(TraceUtil.NAMESPACE_KEY)); + assertEquals(TableName.META_TABLE_NAME.getNameAsString(), + span.getAttributes().get(TraceUtil.TABLE_KEY)); + List regionNames = span.getAttributes().get(TraceUtil.REGION_NAMES_KEY); + assertEquals(3, regionNames.size()); + for (int i = 0; i < 3; i++) { + assertEquals(locs.getRegionLocation(i).getRegion().getRegionNameAsString(), + regionNames.get(i)); + } + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java new file mode 100644 index 000000000000..18a38293db4d --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java @@ -0,0 +1,414 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Span.Kind; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Cell.Type; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; + +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncTableTracing { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncTableTracing.class); + + private static Configuration CONF = HBaseConfiguration.create(); + + private ClientService.Interface stub; + + private AsyncConnection conn; + + private AsyncTable table; + + @Rule + public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); + + @Before + public void setUp() throws IOException { + stub = mock(ClientService.Interface.class); + AtomicInteger scanNextCalled = new AtomicInteger(0); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ScanRequest req = invocation.getArgument(1); + RpcCallback done = invocation.getArgument(2); + if (!req.hasScannerId()) { + done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800) + .setMoreResultsInRegion(true).setMoreResults(true).build()); + } else { + if (req.hasCloseScanner() && req.getCloseScanner()) { + done.run(ScanResponse.getDefaultInstance()); + } else { + Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put) + .setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) + .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) + .setValue(Bytes.toBytes("v")).build(); + Result result = Result.create(Arrays.asList(cell)); + ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800) + .addResults(ProtobufUtil.toResult(result)); + if (req.getLimitOfRows() == 1) { + builder.setMoreResultsInRegion(false).setMoreResults(false); + } else { + builder.setMoreResultsInRegion(true).setMoreResults(true); + } + ForkJoinPool.commonPool().execute(() -> done.run(builder.build())); + } + } + return null; + } + }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any()); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ClientProtos.MultiResponse resp = + ClientProtos.MultiResponse.newBuilder() + .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException( + ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())))) + .build(); + RpcCallback done = invocation.getArgument(2); + ForkJoinPool.commonPool().execute(() -> done.run(resp)); + return null; + } + }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any()); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation(); + MutateResponse resp; + switch (req.getMutateType()) { + case INCREMENT: + ColumnValue value = req.getColumnValue(0); + QualifierValue qvalue = value.getQualifierValue(0); + Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put) + .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray()) + .setQualifier(qvalue.getQualifier().toByteArray()) + .setValue(qvalue.getValue().toByteArray()).build(); + resp = MutateResponse.newBuilder() + .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build(); + break; + default: + resp = MutateResponse.getDefaultInstance(); + break; + } + RpcCallback done = invocation.getArgument(2); + ForkJoinPool.commonPool().execute(() -> done.run(resp)); + return null; + } + }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any()); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + RpcCallback done = invocation.getArgument(2); + ForkJoinPool.commonPool().execute(() -> done.run(GetResponse.getDefaultInstance())); + return null; + } + }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any()); + conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", null, + UserProvider.instantiate(CONF).getCurrent()) { + + @Override + AsyncRegionLocator getLocator() { + AsyncRegionLocator locator = mock(AsyncRegionLocator.class); + Answer> answer = + new Answer>() { + + @Override + public CompletableFuture answer(InvocationOnMock invocation) + throws Throwable { + TableName tableName = invocation.getArgument(0); + RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); + ServerName serverName = ServerName.valueOf("rs", 16010, 12345); + HRegionLocation loc = new HRegionLocation(info, serverName); + return CompletableFuture.completedFuture(loc); + } + }; + doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), + any(RegionLocateType.class), anyLong()); + doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), + anyInt(), any(RegionLocateType.class), anyLong()); + return locator; + } + + @Override + ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { + return stub; + } + }; + table = conn.getTable(TableName.valueOf("table"), ForkJoinPool.commonPool()); + } + + @After + public void tearDown() throws IOException { + Closeables.close(conn, true); + } + + private void assertTrace(String methodName) { + Waiter.waitFor(CONF, 1000, + () -> traceRule.getSpans().stream() + .anyMatch(span -> span.getName().equals("AsyncTable." + methodName) && + span.getKind() == Kind.INTERNAL && span.hasEnded())); + SpanData data = traceRule.getSpans().stream() + .filter(s -> s.getName().equals("AsyncTable." + methodName)).findFirst().get(); + assertEquals(StatusCode.OK, data.getStatus().getStatusCode()); + TableName tableName = table.getName(); + assertEquals(tableName.getNamespaceAsString(), + data.getAttributes().get(TraceUtil.NAMESPACE_KEY)); + assertEquals(tableName.getNameAsString(), data.getAttributes().get(TraceUtil.TABLE_KEY)); + } + + @Test + public void testExists() { + table.exists(new Get(Bytes.toBytes(0))).join(); + assertTrace("get"); + } + + @Test + public void testGet() { + table.get(new Get(Bytes.toBytes(0))).join(); + assertTrace("get"); + } + + @Test + public void testPut() { + table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), + Bytes.toBytes("v"))).join(); + assertTrace("put"); + } + + @Test + public void testDelete() { + table.delete(new Delete(Bytes.toBytes(0))).join(); + assertTrace("delete"); + } + + @Test + public void testAppend() { + table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), + Bytes.toBytes("v"))).join(); + assertTrace("append"); + } + + @Test + public void testIncrement() { + table + .increment( + new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)) + .join(); + assertTrace("increment"); + } + + @Test + public void testIncrementColumnValue1() { + table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1) + .join(); + assertTrace("increment"); + } + + @Test + public void testIncrementColumnValue2() { + table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, + Durability.ASYNC_WAL).join(); + assertTrace("increment"); + } + + @Test + public void testCheckAndMutate() { + table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0)) + .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) + .build(new Delete(Bytes.toBytes(0)))).join(); + assertTrace("checkAndMutate"); + } + + @Test + public void testCheckAndMutateList() { + CompletableFuture + .allOf(table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) + .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) + .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0])) + .join(); + assertTrace("checkAndMutateList"); + } + + @Test + public void testCheckAndMutateAll() { + table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) + .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) + .build(new Delete(Bytes.toBytes(0))))).join(); + assertTrace("checkAndMutateList"); + } + + @Test + public void testMutateRow() throws IOException { + table.mutateRow(new RowMutations(Bytes.toBytes(0)).add(new Delete(Bytes.toBytes(0)))); + assertTrace("mutateRow"); + } + + @Test + public void testScanAll() throws IOException { + table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join(); + assertTrace("scanAll"); + } + + @Test + public void testExistsList() { + CompletableFuture + .allOf( + table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) + .join(); + assertTrace("getList"); + } + + @Test + public void testExistsAll() { + table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); + assertTrace("getList"); + } + + @Test + public void testGetList() { + CompletableFuture + .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) + .join(); + assertTrace("getList"); + } + + @Test + public void testGetAll() { + table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); + assertTrace("getList"); + } + + @Test + public void testPutList() { + CompletableFuture + .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), + Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0])) + .join(); + assertTrace("putList"); + } + + @Test + public void testPutAll() { + table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), + Bytes.toBytes("cq"), Bytes.toBytes("v")))).join(); + assertTrace("putList"); + } + + @Test + public void testDeleteList() { + CompletableFuture + .allOf( + table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) + .join(); + assertTrace("deleteList"); + } + + @Test + public void testDeleteAll() { + table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); + assertTrace("deleteList"); + } + + @Test + public void testBatch() { + CompletableFuture + .allOf( + table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) + .join(); + assertTrace("batch"); + } + + @Test + public void testBatchAll() { + table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); + assertTrace("batch"); + } + + @Test + public void testConnClose() throws IOException { + conn.close(); + Waiter.waitFor(CONF, 1000, + () -> traceRule.getSpans().stream() + .anyMatch(span -> span.getName().equals("AsyncConnection.close") && + span.getKind() == Kind.INTERNAL && span.hasEnded())); + SpanData data = traceRule.getSpans().stream() + .filter(s -> s.getName().equals("AsyncConnection.close")).findFirst().get(); + assertEquals(StatusCode.OK, data.getStatus().getStatusCode()); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java index 768de9c3b9ca..d0da07103131 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java @@ -18,7 +18,19 @@ package org.apache.hadoop.hbase.trace; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Span.Kind; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.attributes.SemanticAttributes; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private @@ -26,10 +38,132 @@ public final class TraceUtil { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.contrib.hbase"; + public static final AttributeKey NAMESPACE_KEY = SemanticAttributes.DB_HBASE_NAMESPACE; + + public static final AttributeKey TABLE_KEY = AttributeKey.stringKey("db.hbase.table"); + + public static final AttributeKey> REGION_NAMES_KEY = + AttributeKey.stringArrayKey("db.hbase.regions"); + + public static final AttributeKey RPC_SERVICE_KEY = + AttributeKey.stringKey("db.hbase.rpc.service"); + + public static final AttributeKey RPC_METHOD_KEY = + AttributeKey.stringKey("db.hbase.rpc.method"); + + public static final AttributeKey SERVER_NAME_KEY = + AttributeKey.stringKey("db.hbase.server.name"); + private TraceUtil() { } public static Tracer getGlobalTracer() { return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME); } + + /** + * Create a {@link Kind#INTERNAL} span. + */ + public static Span createSpan(String name) { + return createSpan(name, Kind.INTERNAL); + } + + /** + * Create a {@link Kind#INTERNAL} span and set table related attributes. + */ + public static Span createTableSpan(String spanName, TableName tableName) { + return createSpan(spanName).setAttribute(NAMESPACE_KEY, tableName.getNamespaceAsString()) + .setAttribute(TABLE_KEY, tableName.getNameAsString()); + } + + /** + * Create a span with the given {@code kind}. Notice that, OpenTelemetry only expects one + * {@link Kind#CLIENT} span and one {@link Kind#SERVER} span for a traced request, so use this + * with caution when you want to create spans with kind other than {@link Kind#INTERNAL}. + */ + private static Span createSpan(String name, Kind kind) { + return getGlobalTracer().spanBuilder(name).setSpanKind(kind).startSpan(); + } + + /** + * Create a span which parent is from remote, i.e, passed through rpc. + *

+ * We will set the kind of the returned span to {@link Kind#SERVER}, as this should be the top + * most span at server side. + */ + public static Span createRemoteSpan(String name, Context ctx) { + return getGlobalTracer().spanBuilder(name).setParent(ctx).setSpanKind(Kind.SERVER).startSpan(); + } + + /** + * Trace an asynchronous operation for a table. + */ + public static CompletableFuture tracedFuture(Supplier> action, + String spanName, TableName tableName) { + Span span = createTableSpan(spanName, tableName); + try (Scope scope = span.makeCurrent()) { + CompletableFuture future = action.get(); + endSpan(future, span); + return future; + } + } + + /** + * Trace an asynchronous operation. + */ + public static CompletableFuture tracedFuture(Supplier> action, + String spanName) { + Span span = createSpan(spanName); + try (Scope scope = span.makeCurrent()) { + CompletableFuture future = action.get(); + endSpan(future, span); + return future; + } + } + + /** + * Trace an asynchronous operation, and finish the create {@link Span} when all the given + * {@code futures} are completed. + */ + public static List> tracedFutures( + Supplier>> action, String spanName, TableName tableName) { + Span span = createTableSpan(spanName, tableName); + try (Scope scope = span.makeCurrent()) { + List> futures = action.get(); + endSpan(CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])), span); + return futures; + } + } + + /** + * Finish the {@code span} when the given {@code future} is completed. + */ + private static void endSpan(CompletableFuture future, Span span) { + FutureUtils.addListener(future, (resp, error) -> { + if (error != null) { + span.recordException(error); + span.setStatus(StatusCode.ERROR); + } else { + span.setStatus(StatusCode.OK); + } + span.end(); + }); + } + + public static void trace(Runnable action, String spanName) { + trace(action, () -> createSpan(spanName)); + } + + public static void trace(Runnable action, Supplier creator) { + Span span = creator.get(); + try (Scope scope = span.makeCurrent()) { + action.run(); + span.setStatus(StatusCode.OK); + } catch (Throwable e) { + span.recordException(e); + span.setStatus(StatusCode.ERROR); + } finally { + span.end(); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index fe9d139f1edb..48eb28ef3d30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -121,9 +121,10 @@ public void run() { RpcServer.CurCall.set(call); String serviceName = getServiceName(); String methodName = getMethodName(); - String traceString = serviceName + "." + methodName; - Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString) - .setParent(Context.current().with(((ServerCall) call).getSpan())).startSpan(); + Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcServer.callMethod") + .setParent(Context.current().with(((ServerCall) call).getSpan())).startSpan() + .setAttribute(TraceUtil.RPC_SERVICE_KEY, serviceName) + .setAttribute(TraceUtil.RPC_METHOD_KEY, methodName); try (Scope traceScope = span.makeCurrent()) { if (!this.rpcServer.isStarted()) { InetSocketAddress address = rpcServer.getListenerAddress(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index 76541678ec49..b5f4ad926b85 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -629,8 +629,7 @@ public String get(RPCTInfo carrier, String key) { }; Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator() .extract(Context.current(), header.getTraceInfo(), getter); - Span span = - TraceUtil.getGlobalTracer().spanBuilder("RpcServer.process").setParent(traceCtx).startSpan(); + Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx); try (Scope scope = span.makeCurrent()) { int id = header.getCallId(); if (RpcServer.LOG.isTraceEnabled()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 11978ca6c8e6..4aca76400311 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; @@ -448,6 +449,19 @@ private void assertSameTraceId() { } } + private SpanData waitSpan(String name) { + Waiter.waitFor(CONF, 1000, + () -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name))); + return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get(); + } + + private void assertRpcAttribute(SpanData data, String methodName) { + assertEquals(SERVICE.getDescriptorForType().getName(), + data.getAttributes().get(TraceUtil.RPC_SERVICE_KEY)); + assertEquals(methodName, + data.getAttributes().get(TraceUtil.RPC_METHOD_KEY)); + } + @Test public void testTracing() throws IOException, ServiceException { RpcServer rpcServer = createRpcServer(null, "testRpcServer", @@ -457,9 +471,8 @@ public void testTracing() throws IOException, ServiceException { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build()); - Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName) - .anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.pause"))); - + assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause"); + assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause"); assertSameTraceId(); for (SpanData data : traceRule.getSpans()) { assertThat( @@ -471,9 +484,8 @@ public void testTracing() throws IOException, ServiceException { traceRule.clearSpans(); assertThrows(ServiceException.class, () -> stub.error(null, EmptyRequestProto.getDefaultInstance())); - Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName) - .anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.error"))); - + assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error"); + assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error"); assertSameTraceId(); for (SpanData data : traceRule.getSpans()) { assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode()); diff --git a/pom.xml b/pom.xml index da1f7e476bdb..c697e80063c6 100755 --- a/pom.xml +++ b/pom.xml @@ -1653,7 +1653,6 @@ 4.13 1.3 0.13.1 - 0.13.0 1.2.17 2.28.2 0.6.1 @@ -2337,7 +2336,7 @@ io.opentelemetry.javaagent opentelemetry-javaagent - ${opentelemetry-instrumentation.version} + ${opentelemetry.version} all From 0b0c3d01568070bcf86a9f28409e2687481eafa5 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Tue, 29 Dec 2020 16:06:02 +0800 Subject: [PATCH 2/3] fix checkstyle issue --- .../org/apache/hadoop/hbase/client/AsyncConnectionImpl.java | 2 -- .../org/apache/hadoop/hbase/client/TestAsyncTableTracing.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 334739af731e..987ac43d7f84 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -29,7 +29,6 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; -import java.io.Closeable; import java.io.IOException; import java.net.SocketAddress; import java.util.Optional; @@ -62,7 +61,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java index 18a38293db4d..dffd2f9c9a99 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java @@ -24,8 +24,8 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Span.Kind; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; From c8a51ec0147d0b3a47980d68bc0a73ad0dc7283a Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 30 Dec 2020 14:56:54 +0800 Subject: [PATCH 3/3] remove unused method --- .../org/apache/hadoop/hbase/client/RawAsyncTableImpl.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 2c5bd167483e..11d22cd067ee 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -227,10 +227,6 @@ public CompletableFuture get(Get get) { "AsyncTable.get", tableName); } - private String getSpanName(String methodName) { - return getClass().getSimpleName() + "." + methodName; - } - @Override public CompletableFuture put(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize());