diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 3af574cfc0b2..4900581c69ad 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -31,6 +31,8 @@ import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.net.SocketAddress; +import java.util.Collections; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -127,6 +129,11 @@ public class AsyncConnectionImpl implements AsyncConnection { public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, SocketAddress localAddress, User user) { + this(conf, registry, clusterId, localAddress, user, Collections.emptyMap()); + } + + public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, + SocketAddress localAddress, User user, Map connectionAttributes) { this.conf = conf; this.user = user; this.metricsScope = MetricsConnection.getScope(conf, clusterId, this); @@ -142,8 +149,8 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri } else { this.metrics = Optional.empty(); } - this.rpcClient = - RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null)); + this.rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress, + metrics.orElse(null), connectionAttributes); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcTimeout = (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index 4d4559f4b7a9..ac70091dcf65 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; @@ -216,21 +218,53 @@ public static Connection createConnection(Configuration conf, User user) throws */ public static Connection createConnection(Configuration conf, ExecutorService pool, final User user) throws IOException { + return createConnection(conf, pool, user, Collections.emptyMap()); + } + + /** + * Create a new Connection instance using the passed conf instance. Connection + * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + *
+   * Connection connection = ConnectionFactory.createConnection(conf);
+   * Table table = connection.getTable(TableName.valueOf("table1"));
+   * try {
+   *   table.get(...);
+   *   ...
+   * } finally {
+   *   table.close();
+   *   connection.close();
+   * }
+   * 
+ * + * @param conf configuration + * @param user the user the connection is for + * @param pool the thread pool to use for batch operations + * @param connectionAttributes attributes to be sent along to server during connection establish + * @return Connection object for conf + */ + public static Connection createConnection(Configuration conf, ExecutorService pool, + final User user, Map connectionAttributes) throws IOException { Class clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, ConnectionOverAsyncConnection.class, Connection.class); if (clazz != ConnectionOverAsyncConnection.class) { try { // Default HCM#HCI is not accessible; make it so before invoking. - Constructor constructor = - clazz.getDeclaredConstructor(Configuration.class, ExecutorService.class, User.class); + Constructor constructor = clazz.getDeclaredConstructor(Configuration.class, + ExecutorService.class, User.class, Map.class); constructor.setAccessible(true); - return user.runAs((PrivilegedExceptionAction< - Connection>) () -> (Connection) constructor.newInstance(conf, pool, user)); + return user.runAs((PrivilegedExceptionAction) () -> (Connection) constructor + .newInstance(conf, pool, user, connectionAttributes)); } catch (Exception e) { throw new IOException(e); } } else { - return FutureUtils.get(createAsyncConnection(conf, user)).toConnection(); + return FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes)) + .toConnection(); } } @@ -281,6 +315,27 @@ public static CompletableFuture createAsyncConnection(Configura */ public static CompletableFuture createAsyncConnection(Configuration conf, final User user) { + return createAsyncConnection(conf, user, null); + } + + /** + * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. + * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and + * interfaces created from returned connection share zookeeper connection, meta cache, and + * connections to region servers and masters. + *

+ * The caller is responsible for calling {@link AsyncConnection#close()} on the returned + * connection instance. + *

+ * Usually you should only create one AsyncConnection instance in your code and use it everywhere + * as it is thread safe. + * @param conf configuration + * @param user the user the asynchronous connection is for + * @param connectionAttributes attributes to be sent along to server during connection establish + * @return AsyncConnection object wrapped by CompletableFuture + */ + public static CompletableFuture createAsyncConnection(Configuration conf, + final User user, Map connectionAttributes) { return TraceUtil.tracedFuture(() -> { CompletableFuture future = new CompletableFuture<>(); ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf); @@ -300,7 +355,7 @@ public static CompletableFuture createAsyncConnection(Configura try { future.complete( user.runAs((PrivilegedExceptionAction) () -> ReflectionUtils - .newInstance(clazz, conf, registry, clusterId, null, user))); + .newInstance(clazz, conf, registry, clusterId, null, user, connectionAttributes))); } catch (Exception e) { registry.close(); future.completeExceptionally(e); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 23d14c272d2b..9e7f1a5b502e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.net.SocketAddress; import java.util.Collection; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -106,6 +107,7 @@ public abstract class AbstractRpcClient implements RpcC private boolean running = true; // if client runs protected final Configuration conf; + protected final Map connectionAttributes; protected final String clusterId; protected final SocketAddress localAddr; protected final MetricsConnection metrics; @@ -154,7 +156,7 @@ public AtomicInteger load(Address key) throws Exception { * @param metrics the connection metrics */ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, - MetricsConnection metrics) { + MetricsConnection metrics, Map connectionAttributes) { this.userProvider = UserProvider.instantiate(conf); this.localAddr = localAddr; this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); @@ -167,6 +169,7 @@ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress loc this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes this.conf = conf; + this.connectionAttributes = connectionAttributes; this.codec = getCodec(); this.compressor = getCompressor(conf); this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, @@ -417,7 +420,7 @@ private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcCon final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, - hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback() { + hrc.getCallTimeout(), hrc.getPriority(), hrc.getAttributes(), new RpcCallback() { @Override public void run(Call call) { try (Scope scope = call.span.makeCurrent()) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java index 7fffdad935fc..3da00c5395d3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.Collections; +import java.util.Map; import javax.net.SocketFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -41,7 +43,7 @@ public class BlockingRpcClient extends AbstractRpcClient * SocketFactory */ BlockingRpcClient(Configuration conf) { - this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null); + this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null, Collections.emptyMap()); } /** @@ -53,8 +55,8 @@ public class BlockingRpcClient extends AbstractRpcClient * @param metrics the connection metrics */ public BlockingRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, - MetricsConnection metrics) { - super(conf, clusterId, localAddr, metrics); + MetricsConnection metrics, Map connectionAttributes) { + super(conf, clusterId, localAddr, metrics, connectionAttributes); this.socketFactory = NetUtils.getDefaultSocketFactory(conf); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index d63d14940e78..81ad4d2f056d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -219,7 +219,7 @@ public void cleanup(IOException e) { BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, - rpcClient.metrics); + rpcClient.metrics, rpcClient.connectionAttributes); this.rpcClient = rpcClient; this.connectionHeaderPreamble = getConnectionHeaderPreamble(); ConnectionHeader header = getConnectionHeader(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index 3c0e24e57145..0298942e29fa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -19,6 +19,8 @@ import io.opentelemetry.api.trace.Span; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.Optional; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -56,6 +58,7 @@ class Call { final Descriptors.MethodDescriptor md; final int timeout; // timeout in millisecond for this call; 0 means infinite. final int priority; + final Map attributes; final MetricsConnection.CallStats callStats; private final RpcCallback callback; final Span span; @@ -64,6 +67,13 @@ class Call { Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, final Message responseDefaultType, int timeout, int priority, RpcCallback callback, MetricsConnection.CallStats callStats) { + this(id, md, param, cells, responseDefaultType, timeout, priority, Collections.emptyMap(), + callback, callStats); + } + + Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, + final Message responseDefaultType, int timeout, int priority, Map attributes, + RpcCallback callback, MetricsConnection.CallStats callStats) { this.param = param; this.md = md; this.cells = cells; @@ -73,6 +83,7 @@ class Call { this.id = id; this.timeout = timeout; this.priority = priority; + this.attributes = attributes; this.callback = callback; this.span = Span.current(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java index 9bee88d599f7..caf0dae03ce5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -112,6 +114,11 @@ public boolean hasCallTimeout() { return delegate.hasCallTimeout(); } + @Override + public Map getAttributes() { + return Collections.emptyMap(); + } + @Override public void setFailed(IOException e) { delegate.setFailed(e); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index c60de7658f3d..2653387733fd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -71,6 +72,8 @@ public interface HBaseRpcController extends RpcController, CellScannable { boolean hasCallTimeout(); + Map getAttributes(); + /** * Set failed with an exception to pass on. For use in async rpc clients * @param e exception to set with diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index 99ed5c4d48b6..6b862acac9a0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -19,7 +19,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; @@ -166,6 +168,11 @@ public boolean hasCallTimeout() { return callTimeout != null; } + @Override + public Map getAttributes() { + return Collections.emptyMap(); + } + @Override public synchronized String errorText() { if (!done || exception == null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index b509dcbd27b7..d6df6c974ccf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -25,6 +25,7 @@ import java.net.ConnectException; import java.net.SocketTimeoutException; import java.nio.channels.ClosedChannelException; +import java.util.Map; import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -44,10 +45,12 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; @@ -126,6 +129,14 @@ static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) if (call.priority != HConstants.PRIORITY_UNSET) { builder.setPriority(call.priority); } + if (call.attributes != null && !call.attributes.isEmpty()) { + HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder(); + for (Map.Entry attribute : call.attributes.entrySet()) { + attributeBuilder.setName(attribute.getKey()); + attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue())); + builder.addAttribute(attributeBuilder.build()); + } + } builder.setTimeout(call.timeout); return builder.build(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java index 231caa40a89e..ed0c4fffc724 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -55,7 +57,12 @@ public class NettyRpcClient extends AbstractRpcClient { public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, MetricsConnection metrics) { - super(configuration, clusterId, localAddress, metrics); + this(configuration, clusterId, localAddress, metrics, Collections.emptyMap()); + } + + public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, + MetricsConnection metrics, Map connectionAttributes) { + super(configuration, clusterId, localAddress, metrics, connectionAttributes); Pair> groupAndChannelClass = NettyRpcClientConfigHelper.getEventLoopConfig(conf); if (groupAndChannelClass == null) { @@ -75,7 +82,7 @@ public NettyRpcClient(Configuration configuration, String clusterId, SocketAddre /** Used in test only. */ public NettyRpcClient(Configuration configuration) { - this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null); + this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, Collections.emptyMap()); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index 48104038c217..3f9a58d51263 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -104,7 +104,7 @@ class NettyRpcConnection extends RpcConnection { NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, - rpcClient.metrics); + rpcClient.metrics, rpcClient.connectionAttributes); this.rpcClient = rpcClient; this.eventLoop = rpcClient.group.next(); byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java index 9b69b5234050..f1df572675c7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.ipc; import java.net.SocketAddress; +import java.util.Collections; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -59,7 +61,7 @@ public static RpcClient createClient(Configuration conf, String clusterId) { */ public static RpcClient createClient(Configuration conf, String clusterId, MetricsConnection metrics) { - return createClient(conf, clusterId, null, metrics); + return createClient(conf, clusterId, null, metrics, Collections.emptyMap()); } private static String getRpcClientClass(Configuration conf) { @@ -81,10 +83,11 @@ private static String getRpcClientClass(Configuration conf) { * @return newly created RpcClient */ public static RpcClient createClient(Configuration conf, String clusterId, - SocketAddress localAddr, MetricsConnection metrics) { + SocketAddress localAddr, MetricsConnection metrics, Map connectionAttributes) { String rpcClientClass = getRpcClientClass(conf); - return ReflectionUtils.instantiateWithCustomCtor(rpcClientClass, new Class[] { - Configuration.class, String.class, SocketAddress.class, MetricsConnection.class }, - new Object[] { conf, clusterId, localAddr, metrics }); + return ReflectionUtils.instantiateWithCustomCtor( + rpcClientClass, new Class[] { Configuration.class, String.class, SocketAddress.class, + MetricsConnection.class, Map.class }, + new Object[] { conf, clusterId, localAddr, metrics, connectionAttributes }); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 912fa4fb0654..31698a1a1e8e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -39,11 +40,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.Timeout; import org.apache.hbase.thirdparty.io.netty.util.TimerTask; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; @@ -70,6 +73,7 @@ abstract class RpcConnection { protected final CompressionCodec compressor; protected final MetricsConnection metrics; + private final Map connectionAttributes; protected final HashedWheelTimer timeoutTimer; @@ -86,12 +90,13 @@ abstract class RpcConnection { protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId, String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor, - MetricsConnection metrics) throws IOException { + MetricsConnection metrics, Map connectionAttributes) throws IOException { this.timeoutTimer = timeoutTimer; this.codec = codec; this.compressor = compressor; this.conf = conf; this.metrics = metrics; + this.connectionAttributes = connectionAttributes; User ticket = remoteId.getTicket(); this.securityInfo = SecurityInfo.getInfo(remoteId.getServiceName()); this.useSasl = isSecurityEnabled; @@ -169,6 +174,14 @@ protected final ConnectionHeader getConnectionHeader() { if (this.compressor != null) { builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName()); } + if (connectionAttributes != null && !connectionAttributes.isEmpty()) { + HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder(); + for (Map.Entry attribute : connectionAttributes.entrySet()) { + attributeBuilder.setName(attribute.getKey()); + attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue())); + builder.addAttribute(attributeBuilder.build()); + } + } builder.setVersionInfo(ProtobufUtil.getVersionInfo()); boolean isCryptoAESEnable = conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT); // if Crypto AES enable, setup Cipher transformation diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java index 6c97c19f96cc..54b351f00a3b 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.net.SocketAddress; import java.util.Collections; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -95,7 +96,7 @@ public class TestRpcBasedRegistryHedgedReads { public static final class RpcClientImpl implements RpcClient { public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress, - MetricsConnection metrics) { + MetricsConnection metrics, Map attributes) { } @Override diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 51e9e1e7755f..fc7f66129d35 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -1667,9 +1667,10 @@ private static class ConfigurationCaptorConnection implements Connection { private final Connection delegate; - public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user) - throws IOException { - delegate = FutureUtils.get(createAsyncConnection(conf, user)).toConnection(); + public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user, + Map connectionAttributes) throws IOException { + delegate = + FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes)).toConnection(); final String uuid = conf.get(UUID_KEY); if (uuid != null) { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java index 7d099aa44e24..0c879bd5ace3 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java @@ -123,7 +123,8 @@ public static class MRSplitsConnection implements Connection { private final Configuration configuration; static final AtomicInteger creations = new AtomicInteger(0); - MRSplitsConnection(Configuration conf, ExecutorService pool, User user) throws IOException { + MRSplitsConnection(Configuration conf, ExecutorService pool, User user, + Map connectionAttributes) throws IOException { this.configuration = conf; creations.incrementAndGet(); } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java index 13e3831f6df6..f41282b8f4f8 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java @@ -212,8 +212,8 @@ private static class ConnectionForMergeTesting implements Connection { SIZE_MAP.put(Bytes.toBytes("p"), 200L * 1024L * 1024L); } - ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user) - throws IOException { + ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user, + Map connectionAttributes) throws IOException { } @Override diff --git a/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto b/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto index 6426f0cb06cb..e992e681fbff 100644 --- a/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto +++ b/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto @@ -92,6 +92,7 @@ message ConnectionHeader { optional VersionInfo version_info = 5; // the transformation for rpc AES encryption with Apache Commons Crypto optional string rpc_crypto_cipher_transformation = 6; + repeated NameBytesPair attribute = 7; } // This is sent by rpc server to negotiate the data if necessary @@ -148,6 +149,7 @@ message RequestHeader { // See HConstants. optional uint32 priority = 6; optional uint32 timeout = 7; + repeated NameBytesPair attribute = 8; } message ResponseHeader { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java index 1dda6c32ca04..e2c11ab1d5e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import java.net.SocketAddress; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -59,7 +60,7 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu public AsyncClusterConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, SocketAddress localAddress, User user) { - super(conf, registry, clusterId, localAddress, user); + super(conf, registry, clusterId, localAddress, user, Collections.emptyMap()); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java index 197ddb71d7e6..cc97a39c7ee4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java @@ -27,6 +27,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; /** @@ -82,6 +83,8 @@ public interface RpcCall extends RpcCallContext { /** Returns The request header of this call. */ RequestHeader getHeader(); + ConnectionHeader getConnectionHeader(); + /** Returns Port of remote address in this call */ int getRemotePort(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 2188795914db..f3568a36f144 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; @@ -207,6 +208,11 @@ public RequestHeader getHeader() { return this.header; } + @Override + public RPCProtos.ConnectionHeader getConnectionHeader() { + return this.connection.connectionHeader; + } + @Override public int getPriority() { return this.header.getPriority(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index 65def75fff1b..d358695c5f9b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -22,6 +22,7 @@ import java.net.SocketAddress; import java.net.SocketTimeoutException; +import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -129,8 +130,8 @@ public void testAdminTimeout() throws Exception { */ public static class RandomTimeoutRpcClient extends BlockingRpcClient { public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, - MetricsConnection metrics) { - super(conf, clusterId, localAddr, metrics); + MetricsConnection metrics, Map connectionAttributes) { + super(conf, clusterId, localAddr, metrics, connectionAttributes); } // Return my own instance, one that does random timeouts diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java new file mode 100644 index 000000000000..198e4766a276 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.AuthUtil; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; + +@Category({ ClientTests.class, MediumTests.class }) +public class TestRequestAndConnectionAttributes { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRequestAndConnectionAttributes.class); + + private static final Map CONNECTION_ATTRIBUTES = new HashMap<>(); + static { + CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo")); + } + private static final Map REQUEST_ATTRIBUTES = new HashMap<>(); + + private static HBaseTestingUtil TEST_UTIL = null; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + TEST_UTIL.getConfiguration().set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, + AttributesRpcControllerFactory.class.getName()); + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testConnectionAttributes() throws IOException { + TableName tableName = TableName.valueOf("testConnectionAttributes"); + TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, + HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); + + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf, null, + AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) { + Result result = table.get(new Get(Bytes.toBytes(0))); + assertEquals(REQUEST_ATTRIBUTES.size() + CONNECTION_ATTRIBUTES.size(), result.size()); + for (Map.Entry attr : CONNECTION_ATTRIBUTES.entrySet()) { + byte[] val = result.getValue(Bytes.toBytes("c"), Bytes.toBytes(attr.getKey())); + assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val)); + } + } + } + + @Test + public void testRequestAttributes() throws IOException { + TableName tableName = TableName.valueOf("testRequestAttributes"); + TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, + HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); + + REQUEST_ATTRIBUTES.clear(); + REQUEST_ATTRIBUTES.put("test1", Bytes.toBytes("a")); + REQUEST_ATTRIBUTES.put("test2", Bytes.toBytes("b")); + + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf, null, + AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) { + + Result result = table.get(new Get(Bytes.toBytes(0))); + assertEquals(REQUEST_ATTRIBUTES.size() + CONNECTION_ATTRIBUTES.size(), result.size()); + for (Map.Entry attr : REQUEST_ATTRIBUTES.entrySet()) { + byte[] val = result.getValue(Bytes.toBytes("r"), Bytes.toBytes(attr.getKey())); + assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val)); + } + + REQUEST_ATTRIBUTES.put("test3", Bytes.toBytes("c")); + result = table.get(new Get(Bytes.toBytes(0))); + assertEquals(REQUEST_ATTRIBUTES.size() + CONNECTION_ATTRIBUTES.size(), result.size()); + for (Map.Entry attr : REQUEST_ATTRIBUTES.entrySet()) { + byte[] val = result.getValue(Bytes.toBytes("r"), Bytes.toBytes(attr.getKey())); + assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val)); + } + } + } + + @Test + public void testNoRequestAttributes() throws IOException { + TableName tableName = TableName.valueOf("testNoRequestAttributes"); + TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, + HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); + + REQUEST_ATTRIBUTES.clear(); + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf, null, + AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) { + TableBuilder tableBuilder = conn.getTableBuilder(tableName, null); + try (Table table = tableBuilder.build()) { + Result result = table.get(new Get(Bytes.toBytes(0))); + assertEquals(CONNECTION_ATTRIBUTES.size(), result.size()); + } + } + } + + public static class AttributesRpcControllerFactory extends RpcControllerFactory { + + public AttributesRpcControllerFactory(Configuration conf) { + super(conf); + } + + @Override + public HBaseRpcController newController() { + return new AttributesRpcController(super.newController()); + } + + @Override + public HBaseRpcController newController(CellScanner cellScanner) { + return new AttributesRpcController(super.newController(cellScanner)); + } + + @Override + public HBaseRpcController newController(RegionInfo regionInfo, CellScanner cellScanner) { + return new AttributesRpcController(super.newController(regionInfo, cellScanner)); + } + + @Override + public HBaseRpcController newController(List cellIterables) { + return new AttributesRpcController(super.newController(cellIterables)); + } + + @Override + public HBaseRpcController newController(RegionInfo regionInfo, + List cellIterables) { + return new AttributesRpcController(super.newController(regionInfo, cellIterables)); + } + } + + public static class AttributesRpcController extends DelegatingHBaseRpcController { + + public AttributesRpcController(HBaseRpcController delegate) { + super(delegate); + } + + @Override + public Map getAttributes() { + return REQUEST_ATTRIBUTES; + } + } + + public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preGetOp(ObserverContext c, Get get, + List result) throws IOException { + RpcCall rpcCall = RpcServer.getCurrentCall().get(); + for (HBaseProtos.NameBytesPair attr : rpcCall.getHeader().getAttributeList()) { + result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) + .setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getName())) + .setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build()); + } + for (HBaseProtos.NameBytesPair attr : rpcCall.getConnectionHeader().getAttributeList()) { + result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) + .setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getName())) + .setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build()); + } + result.sort(CellComparator.getInstance()); + c.bypass(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java index f36fef186f08..feaf44e0b84e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.Socket; import java.net.SocketAddress; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; @@ -71,8 +72,8 @@ public MyRpcClientImpl(Configuration conf) { } public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address, - MetricsConnection metrics) { - super(conf, clusterId, address, metrics); + MetricsConnection metrics, Map connectionAttributes) { + super(conf, clusterId, address, metrics, connectionAttributes); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java index e14b710647d1..80b3845d6688 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java @@ -124,7 +124,7 @@ public void test() throws IOException, InterruptedException { int callId = 10; Call call = new Call(callId, TestProtobufRpcProto.getDescriptor().findMethodByName("ping"), EmptyRequestProto.getDefaultInstance(), null, EmptyResponseProto.getDefaultInstance(), 1000, - HConstants.NORMAL_QOS, null, MetricsConnection.newCallStats()); + HConstants.NORMAL_QOS, null, null, MetricsConnection.newCallStats()); RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, null); dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param)); requestHeader.writeDelimitedTo(dos); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index 909e7fdb7f3d..7a82d08f4adb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -694,6 +694,11 @@ public RPCProtos.RequestHeader getHeader() { return null; } + @Override + public RPCProtos.ConnectionHeader getConnectionHeader() { + return null; + } + @Override public int getRemotePort() { return 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java index d26870b77dfd..9bbbdbeebad0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java @@ -220,6 +220,11 @@ public RPCProtos.RequestHeader getHeader() { return null; } + @Override + public RPCProtos.ConnectionHeader getConnectionHeader() { + return null; + } + @Override public int getRemotePort() { return 0; diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java index 250b8a74f030..bdb54c2bddd5 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java @@ -89,8 +89,8 @@ public class ThriftConnection implements Connection { private int operationTimeout; private int connectTimeout; - public ThriftConnection(Configuration conf, ExecutorService pool, final User user) - throws IOException { + public ThriftConnection(Configuration conf, ExecutorService pool, final User user, + Map connectionAttributes) throws IOException { this.conf = conf; this.user = user; this.host = conf.get(Constants.HBASE_THRIFT_SERVER_NAME);