diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 1a2c65c1c896..8aff6eb2c5b8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -63,6 +63,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.customthreadattribute.CustomThreadAttribute; +import org.apache.hadoop.hbase.util.customthreadattribute.CustomThreadAttributeUtil; import org.apache.htrace.Trace; /** @@ -234,6 +236,8 @@ public String toString() { new ConcurrentHashMap(); // Start configuration settings. private final int startLogErrorsCnt; + private Configuration conf; + private List customThreadAttributes; /** * The number of tasks simultaneously executed on the cluster. @@ -318,6 +322,8 @@ public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService po this.connection = hc; this.pool = pool; this.globalErrors = useGlobalErrors ? new BatchErrors() : null; + this.conf = conf; + this.customThreadAttributes = CustomThreadAttributeUtil.getAllAttributes(conf); this.id = COUNTER.incrementAndGet(); @@ -607,7 +613,7 @@ AsyncRequestFuture submitMultiActions(TableName tableName, /** * Helper that is used when grouping the actions per region server. * - * @param loc - the destination. Must not be null. + * @param server - the destination. Must not be null. * @param action - the action to add to the multiaction * @param actionsByServer the multiaction per server * @param nonceGroup Nonce group. @@ -711,40 +717,49 @@ public ReplicaCallIssuingRunnable(List> initialActions, long startTi @Override public void run() { - boolean done = false; - if (primaryCallTimeoutMicroseconds > 0) { - try { - done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds); - } catch (InterruptedException ex) { - LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage()); + try { + CustomThreadAttributeUtil.setAttributes(customThreadAttributes, conf); + boolean done = false; + if (primaryCallTimeoutMicroseconds > 0) { + try { + done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds); + } catch (InterruptedException ex) { + LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage()); + return; + } + } + if (done) { + // Done within primary timeout return; } - } - if (done) return; // Done within primary timeout - Map> actionsByServer = + Map> actionsByServer = new HashMap>(); - List> unknownLocActions = new ArrayList>(); - if (replicaGetIndices == null) { - for (int i = 0; i < results.length; ++i) { - addReplicaActions(i, actionsByServer, unknownLocActions); - } - } else { - for (int replicaGetIndice : replicaGetIndices) { - addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions); - } - } - if (!actionsByServer.isEmpty()) { - sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty()); - } - if (!unknownLocActions.isEmpty()) { - actionsByServer = new HashMap>(); - for (Action action : unknownLocActions) { - addReplicaActionsAgain(action, actionsByServer); + List> unknownLocActions = new ArrayList>(); + if (replicaGetIndices == null) { + for (int i = 0; i < results.length; ++i) { + addReplicaActions(i, actionsByServer, unknownLocActions); + } + } else { + for (int replicaGetIndice : replicaGetIndices) { + addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions); + } } - // Some actions may have completely failed, they are handled inside addAgain. if (!actionsByServer.isEmpty()) { - sendMultiAction(actionsByServer, 1, null, true); + sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty()); } + if (!unknownLocActions.isEmpty()) { + actionsByServer = new HashMap>(); + for (Action action : unknownLocActions) { + addReplicaActionsAgain(action, actionsByServer); + } + // Some actions may have completely failed, they are handled inside addAgain. + if (!actionsByServer.isEmpty()) { + sendMultiAction(actionsByServer, 1, null, true); + } + } + } + finally { + CustomThreadAttributeUtil.clearAttributes(customThreadAttributes, conf); } } @@ -819,6 +834,7 @@ public void run() { MultiResponse res = null; PayloadCarryingServerCallable callable = currentCallable; try { + CustomThreadAttributeUtil.setAttributes(customThreadAttributes, conf); // setup the callable based on the actions, if we don't have one already from the request if (callable == null) { callable = createCallable(server, tableName, multiAction); @@ -858,6 +874,7 @@ public void run() { if (callsInProgress != null && callable != null && res != null) { callsInProgress.remove(callable); } + CustomThreadAttributeUtil.clearAttributes(customThreadAttributes, conf); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index ac9046ca2083..b53f275959f3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -51,6 +51,8 @@ import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.customthreadattribute.CustomThreadAttribute; +import org.apache.hadoop.hbase.util.customthreadattribute.CustomThreadAttributeUtil; /** * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. @@ -441,6 +443,8 @@ static class FlushWorker implements Runnable { private final int maxRetryInQueue; private final AtomicInteger retryInQueue = new AtomicInteger(0); private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor + private Configuration conf; + private List customThreadAttributes; public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, @@ -456,6 +460,8 @@ public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation a this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout); this.executor = executor; this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); + this.conf = conf; + this.customThreadAttributes = CustomThreadAttributeUtil.getAllAttributes(conf); } protected LinkedBlockingQueue getQueue() { @@ -512,12 +518,14 @@ boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOExcepti public void run() { boolean succ = false; try { + CustomThreadAttributeUtil.setAttributes(customThreadAttributes, conf); succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount); } finally { FlushWorker.this.getRetryInQueue().decrementAndGet(); if (!succ) { FlushWorker.this.getTotalFailedPutCount().incrementAndGet(); } + CustomThreadAttributeUtil.clearAttributes(customThreadAttributes, conf); } } }, delayMs, TimeUnit.MILLISECONDS); @@ -559,6 +567,7 @@ ScheduledExecutorService getExecutor() { public void run() { int failedCount = 0; try { + CustomThreadAttributeUtil.setAttributes(customThreadAttributes, conf); long start = EnvironmentEdgeManager.currentTime(); // drain all the queued puts into the tmp list @@ -653,6 +662,7 @@ public void run() { } finally { // Update the totalFailedCount this.totalFailedPutCount.addAndGet(failedCount); + CustomThreadAttributeUtil.clearAttributes(customThreadAttributes, conf); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java index 08b5e9b7562a..ee8e4d0518ca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java @@ -22,6 +22,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -30,6 +31,8 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.customthreadattribute.CustomThreadAttribute; +import org.apache.hadoop.hbase.util.customthreadattribute.CustomThreadAttributeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +61,8 @@ public class MasterAddressRefresher implements Closeable { private final long periodicRefreshMs; private final long timeBetweenRefreshesMs; private final Object refreshMasters = new Object(); + private Configuration conf; + private List customThreadAttributes; @Override public void close() { @@ -72,31 +77,36 @@ private class RefreshThread implements Runnable { @Override public void run() { long lastRpcTs = 0; - while (!Thread.interrupted()) { - try { - // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't - // have duplicate refreshes because once the thread is past the wait(), notify()s are - // ignored until the thread is back to the waiting state. - synchronized (refreshMasters) { - refreshMasters.wait(periodicRefreshMs); + try { + CustomThreadAttributeUtil.setAttributes(customThreadAttributes, conf); + while (!Thread.interrupted()) { + try { + // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't + // have duplicate refreshes because once the thread is past the wait(), notify()s are + // ignored until the thread is back to the waiting state. + synchronized (refreshMasters) { + refreshMasters.wait(periodicRefreshMs); + } + long currentTs = EnvironmentEdgeManager.currentTime(); + if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) { + continue; + } + lastRpcTs = currentTs; + LOG.debug("Attempting to refresh master address end points."); + Set newMasters = new HashSet<>(registry.getMasters()); + registry.populateMasterStubs(newMasters); + LOG.debug("Finished refreshing master end points. {}", newMasters); + } catch (InterruptedException e) { + LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e); + break; + } catch (IOException e) { + LOG.debug("Error populating latest list of masters.", e); } - long currentTs = EnvironmentEdgeManager.currentTime(); - if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) { - continue; - } - lastRpcTs = currentTs; - LOG.debug("Attempting to refresh master address end points."); - Set newMasters = new HashSet<>(registry.getMasters()); - registry.populateMasterStubs(newMasters); - LOG.debug("Finished refreshing master end points. {}", newMasters); - } catch (InterruptedException e) { - LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e); - break; - } catch (IOException e) { - LOG.debug("Error populating latest list of masters.", e); } + LOG.info("Master end point refresher loop exited."); + } finally{ + CustomThreadAttributeUtil.clearAttributes(customThreadAttributes, conf); } - LOG.info("Master end point refresher loop exited."); } } @@ -111,6 +121,8 @@ public void run() { Preconditions.checkArgument(timeBetweenRefreshesMs < periodicRefreshMs); this.registry = registry; pool.submit(new RefreshThread()); + this.conf = conf; + this.customThreadAttributes = CustomThreadAttributeUtil.getAllAttributes(conf); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index d8b2e2c54b72..8dc0603cada4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -43,6 +43,7 @@ import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayDeque; +import java.util.List; import java.util.Locale; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; @@ -70,6 +71,8 @@ import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; +import org.apache.hadoop.hbase.util.customthreadattribute.CustomThreadAttribute; +import org.apache.hadoop.hbase.util.customthreadattribute.CustomThreadAttributeUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; @@ -93,6 +96,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "We are always under lock actually") private Thread thread; + private Configuration conf; + private List customThreadAttributes; // connected socket. protected for writing UT. protected Socket socket = null; @@ -168,33 +173,38 @@ public void remove(Call call) { @Override public void run() { synchronized (BlockingRpcConnection.this) { - while (!closed) { - if (callsToWrite.isEmpty()) { - // We should use another monitor object here for better performance since the read - // thread also uses ConnectionImpl.this. But this makes the locking schema more - // complicated, can do it later as an optimization. - try { - BlockingRpcConnection.this.wait(); - } catch (InterruptedException e) { + try { + CustomThreadAttributeUtil.setAttributes(customThreadAttributes, conf); + while (!closed) { + if (callsToWrite.isEmpty()) { + // We should use another monitor object here for better performance since the read + // thread also uses ConnectionImpl.this. But this makes the locking schema more + // complicated, can do it later as an optimization. + try { + BlockingRpcConnection.this.wait(); + } catch (InterruptedException e) { + } + // check if we need to quit, so continue the main loop instead of fallback. + continue; } - // check if we need to quit, so continue the main loop instead of fallback. - continue; - } - Call call = callsToWrite.poll(); - if (call.isDone()) { - continue; - } - try { - tracedWriteRequest(call); - } catch (IOException e) { - // exception here means the call has not been added to the pendingCalls yet, so we need - // to fail it by our own. - if (LOG.isDebugEnabled()) { - LOG.debug("call write error for call #" + call.id, e); + Call call = callsToWrite.poll(); + if (call.isDone()) { + continue; + } + try { + tracedWriteRequest(call); + } catch (IOException e) { + // exception here means the call has not been added to the pendingCalls yet, so we + // need to fail it by our own. + if (LOG.isDebugEnabled()) { + LOG.debug("call write error for call #" + call.id, e); + } + call.setException(e); + closeConn(e); } - call.setException(e); - closeConn(e); } + } finally { + CustomThreadAttributeUtil.clearAttributes(customThreadAttributes, conf); } } } @@ -237,6 +247,8 @@ public void cleanup(IOException e) { } else { callSender = null; } + this.conf = rpcClient.conf; + this.customThreadAttributes = CustomThreadAttributeUtil.getAllAttributes(rpcClient.conf); } // protected for write UT. @@ -335,14 +347,19 @@ private synchronized boolean waitForWork() { @Override public void run() { - if (LOG.isTraceEnabled()) { - LOG.trace(threadName + ": starting, connections " + this.rpcClient.connections.size()); - } - while (waitForWork()) { - readResponse(); - } - if (LOG.isTraceEnabled()) { - LOG.trace(threadName + ": stopped, connections " + this.rpcClient.connections.size()); + try { + CustomThreadAttributeUtil.setAttributes(customThreadAttributes, conf); + if (LOG.isTraceEnabled()) { + LOG.trace(threadName + ": starting, connections " + this.rpcClient.connections.size()); + } + while (waitForWork()) { + readResponse(); + } + if (LOG.isTraceEnabled()) { + LOG.trace(threadName + ": stopped, connections " + this.rpcClient.connections.size()); + } + } finally { + CustomThreadAttributeUtil.clearAttributes(customThreadAttributes, conf); } } @@ -382,39 +399,45 @@ private void handleSaslConnectionFailure(final int currRetries, final int maxRet user.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws IOException, InterruptedException { - if (shouldAuthenticateOverKrb()) { - if (currRetries < maxRetries) { - if (LOG.isDebugEnabled()) { - LOG.debug("Exception encountered while connecting to " + "the server : " + ex); + try { + CustomThreadAttributeUtil.setAttributes(customThreadAttributes, conf); + if (shouldAuthenticateOverKrb()) { + if (currRetries < maxRetries) { + if (LOG.isDebugEnabled()) { + LOG.debug("Exception encountered while connecting to " + "the server : " + ex); + } + // try re-login + relogin(); + disposeSasl(); + // have granularity of milliseconds + // we are sleeping with the Connection lock held but since this + // connection instance is being used for connecting to the server + // in question, it is okay + Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1); + return null; + } else { + String msg = + "Couldn't setup connection for " + UserGroupInformation.getLoginUser().getUserName() + + " to " + serverPrincipal; + LOG.warn(msg, ex); + throw new IOException(msg, ex); } - // try re-login - relogin(); - disposeSasl(); - // have granularity of milliseconds - // we are sleeping with the Connection lock held but since this - // connection instance is being used for connecting to the server - // in question, it is okay - Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1); - return null; } else { - String msg = "Couldn't setup connection for " + - UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal; - LOG.warn(msg, ex); - throw new IOException(msg, ex); + LOG.warn("Exception encountered while connecting to " + "the server : " + ex); } - } else { - LOG.warn("Exception encountered while connecting to " + "the server : " + ex); - } - if (ex instanceof RemoteException) { - throw (RemoteException) ex; - } - if (ex instanceof SaslException) { - String msg = "SASL authentication failed." + - " The most likely cause is missing or invalid credentials." + " Consider 'kinit'."; - LOG.fatal(msg, ex); - throw new RuntimeException(msg, ex); + if (ex instanceof RemoteException) { + throw (RemoteException) ex; + } + if (ex instanceof SaslException) { + String msg = "SASL authentication failed." + + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'."; + LOG.fatal(msg, ex); + throw new RuntimeException(msg, ex); + } + throw new IOException(ex); + } finally { + CustomThreadAttributeUtil.clearAttributes(customThreadAttributes, conf); } - throw new IOException(ex); } }); } @@ -460,7 +483,12 @@ private void setupIOstreams() throws IOException { continueSasl = ticket.doAs(new PrivilegedExceptionAction() { @Override public Boolean run() throws IOException { - return setupSaslConnection(in2, out2); + try { + CustomThreadAttributeUtil.setAttributes(customThreadAttributes, conf); + return setupSaslConnection(in2, out2); + } finally { + CustomThreadAttributeUtil.clearAttributes(customThreadAttributes, conf); + } } }); } catch (Exception ex) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/AttributeType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/AttributeType.java new file mode 100644 index 000000000000..2aa27ab69def --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/AttributeType.java @@ -0,0 +1,43 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.customthreadattribute; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Defines the type of the custom attribute and a property prefix for it + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public enum AttributeType { + // These string literals will be moved to HConstants + REQUEST_ID_CONTEXT(HConstants.CUSTOM_THREAD_ATTRIBUTE_REQUEST_ID_CONTEXT_PREFIX); + + private String propertyPrefix; + + private AttributeType(String propertyPrefix) { + this.propertyPrefix = propertyPrefix; + } + + @Override public String toString() { + return this.propertyPrefix; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/AttributeTypeHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/AttributeTypeHandler.java new file mode 100644 index 000000000000..8209b315874d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/AttributeTypeHandler.java @@ -0,0 +1,52 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.customthreadattribute; + +import java.util.List; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface AttributeTypeHandler { + + /** + * Get all the attributes that are enabled from the current thread's context + * + * @return List of {@link CustomThreadAttribute} + */ + public List getAllAttributes(); + + /** + * Sets the attributes into current thread's context + */ + public void setAttribute(String key, Object value); + + /** + * Clears the attributes from the current thread's context + */ + public void clearAttribute(String key); + + /** + * Get an attribute from the current thread's context + * + * @return {@link CustomThreadAttribute} + */ + public CustomThreadAttribute getAttribute(String key); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/CustomThreadAttribute.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/CustomThreadAttribute.java new file mode 100644 index 000000000000..638dad890372 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/CustomThreadAttribute.java @@ -0,0 +1,73 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.customthreadattribute; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * This class defines a custom thread attribute + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class CustomThreadAttribute { + private String key; + private Object value; + private AttributeType type; + + /** + * Constructor for initializing a custom thread attribute + * @param key An attribute's Key + * @param value An attribute's Value + * @param type An attribute's Type + */ + public CustomThreadAttribute(String key, Object value, AttributeType type) { + this.key = key; + this.value = value; + this.type = type; + } + + /** + * @return Attribute's Key + */ + public String getKey() { + return key; + } + + /** + * @return Attribute's Value + */ + public Object getValue() { + return value; + } + + /** + * @return Attribute's Type + */ + public AttributeType getType() { + return type; + } + + /** + * set Attribute's Type + */ + public void setType(AttributeType type) { + this.type = type; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/CustomThreadAttributeUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/CustomThreadAttributeUtil.java new file mode 100644 index 000000000000..2a34b00f925a --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/CustomThreadAttributeUtil.java @@ -0,0 +1,164 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.customthreadattribute; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * A utility class for handling the set/get/clear operations of custom thread attributes + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class CustomThreadAttributeUtil { + + private static final Log LOG = LogFactory.getLog(CustomThreadAttributeUtil.class); + + private CustomThreadAttributeUtil() { + throw new IllegalStateException("Utility class"); + } + + /** + * Get all the attributes that are enabled from the current thread's context + * + * @param conf Cluster Configuration + * @return List of {@link CustomThreadAttribute} + */ + public static List getAllAttributes(Configuration conf) { + List attributes = new ArrayList<>(); + for (AttributeType attributeType : AttributeType.values()) { + if (isEnabled(attributeType, conf)) { + try { + AttributeTypeHandler handler = + getHandler(getImplementationClasspath(attributeType, conf)); + List attributesOfSameType = handler.getAllAttributes(); + for (CustomThreadAttribute attribute : attributesOfSameType) { + attribute.setType(attributeType); + } + attributes.addAll(attributesOfSameType); + } catch (Exception exception) { + LOG.error("An exception occurred while fetching all attributes", exception); + } + } + } + return attributes; + } + + /** + * Sets the attributes into current thread's context + * + * @param attributes List of {@link CustomThreadAttribute} + * @param conf Cluster Configuration + */ + public static void setAttributes(List attributes, Configuration conf) { + if (attributes == null || attributes.isEmpty()) { + return; + } + + for (CustomThreadAttribute attribute : attributes) { + if (isEnabled(attribute.getType(), conf)) { + try { + AttributeTypeHandler handler = + getHandler(getImplementationClasspath(attribute.getType(), conf)); + handler.setAttribute(attribute.getKey(), attribute.getValue()); + } catch (Exception exception) { + LOG.error("An exception occurred while setting attribute " + attribute.getKey(), + exception); + } + } + } + } + + /** + * Clears the attributes from the current thread's context + * + * @param attributes List of {@link CustomThreadAttribute} + * @param conf Cluster Configuration + */ + public static void clearAttributes(List attributes, Configuration conf) { + if (attributes == null || attributes.isEmpty()) { + return; + } + + for (CustomThreadAttribute attribute : attributes) { + if (isEnabled(attribute.getType(), conf)) { + try { + AttributeTypeHandler handler = + getHandler(getImplementationClasspath(attribute.getType(), conf)); + handler.clearAttribute(attribute.getKey()); + } catch (Exception exception) { + LOG.error("An exception occurred while clearing attributes", exception); + } + } + } + } + + /** + * Get an attribute from the current thread's context + * + * @param attribute {@link CustomThreadAttribute} object with key and type set + * @param conf Cluster Configuration + * @return {@link CustomThreadAttribute} + */ + public static CustomThreadAttribute getAttribute(CustomThreadAttribute attribute, + Configuration conf) { + CustomThreadAttribute value = null; + if (isEnabled(attribute.getType(), conf)) { + try { + AttributeTypeHandler handler = + getHandler(getImplementationClasspath(attribute.getType(), conf)); + value = handler.getAttribute(attribute.getKey()); + value.setType(attribute.getType()); + } catch (Exception exception) { + LOG.error("An exception occurred while fetching attribute " + attribute.getKey(), + exception); + } + } + return value; + } + + private static Boolean isEnabled(AttributeType attributeType, Configuration conf) { + String property = attributeType.toString() + HConstants.CUSTOM_THREAD_ATTRIBUTE_ENABLED_SUFFIX; + return conf.getBoolean(property, HConstants.CUSTOM_THREAD_ATTRIBUTE_DEFAULT_ENABLED); + } + + private static String getImplementationClasspath(AttributeType attributeType, + Configuration conf) { + String property = + attributeType.toString() + HConstants.CUSTOM_THREAD_ATTRIBUTE_IMPLEMENTATION_SUFFIX; + return conf.get(property, null); + } + + private static AttributeTypeHandler getHandler(String classpath) + throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, + InstantiationException, IllegalAccessException { + Class handlerClass = Class.forName(classpath); + handlerClass.getDeclaredConstructor().setAccessible(true); + return (AttributeTypeHandler) handlerClass.getDeclaredConstructor().newInstance(); + } +} + diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/RequestIdContextHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/RequestIdContextHandler.java new file mode 100644 index 000000000000..800f2adddc84 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/customthreadattribute/RequestIdContextHandler.java @@ -0,0 +1,56 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.customthreadattribute; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.slf4j.MDC; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RequestIdContextHandler implements AttributeTypeHandler { + + private static String KEY = "RequestID"; + + RequestIdContextHandler() { + } + + @Override public List getAllAttributes() { + List list = new ArrayList<>(); + CustomThreadAttribute attribute = getAttribute(null); + if (attribute != null) { + list.add(attribute); + } + return list; + } + + @Override public void setAttribute(String key, Object value) { + MDC.put(KEY, value.toString()); + } + + @Override public void clearAttribute(String key) { + MDC.clear(); + } + + @Override public CustomThreadAttribute getAttribute(String key) { + return new CustomThreadAttribute(KEY, MDC.get(KEY), null); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/customthreadattribute/CustomThreadAttributeUtilTest.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/customthreadattribute/CustomThreadAttributeUtilTest.java new file mode 100644 index 000000000000..57edba723fa0 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/customthreadattribute/CustomThreadAttributeUtilTest.java @@ -0,0 +1,90 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.customthreadattribute; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) public class CustomThreadAttributeUtilTest { + + static String KEY = "key"; + static String VALUE = "value"; + + @Test + public void testCustomThreadAttributeUtil() { + // Initialize the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(HConstants.CUSTOM_THREAD_ATTRIBUTE_REQUEST_ID_CONTEXT_PREFIX + + HConstants.CUSTOM_THREAD_ATTRIBUTE_ENABLED_SUFFIX, true); + conf.set(HConstants.CUSTOM_THREAD_ATTRIBUTE_REQUEST_ID_CONTEXT_PREFIX + + HConstants.CUSTOM_THREAD_ATTRIBUTE_IMPLEMENTATION_SUFFIX, + "org.apache.hadoop.hbase.util.customthreadattribute.RequestIdContextHandler"); + + // Create the attribute + CustomThreadAttribute attribute = + new CustomThreadAttribute(KEY, VALUE, AttributeType.REQUEST_ID_CONTEXT); + List attributes = new ArrayList<>(); + attributes.add(attribute); + + // Set the attribute + CustomThreadAttributeUtil.setAttributes(attributes, conf); + + // Assert that we get what we set + Assert.assertEquals(VALUE, CustomThreadAttributeUtil.getAttribute(attribute, conf).getValue()); + Assert.assertEquals(VALUE, CustomThreadAttributeUtil.getAllAttributes(conf).get(0).getValue()); + + // Assert that the value is no longer set + CustomThreadAttributeUtil.clearAttributes(attributes, conf); + Assert.assertNull(CustomThreadAttributeUtil.getAttribute(attribute, conf).getValue()); + } + + @Test + public void testCustomThreadAttributeUtilException(){ + // Initialize the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(HConstants.CUSTOM_THREAD_ATTRIBUTE_REQUEST_ID_CONTEXT_PREFIX + + HConstants.CUSTOM_THREAD_ATTRIBUTE_ENABLED_SUFFIX, true); + conf.set(HConstants.CUSTOM_THREAD_ATTRIBUTE_REQUEST_ID_CONTEXT_PREFIX + + HConstants.CUSTOM_THREAD_ATTRIBUTE_IMPLEMENTATION_SUFFIX, + "non.existing.class"); + + // Create the attribute + CustomThreadAttribute attribute = + new CustomThreadAttribute(KEY, VALUE, AttributeType.REQUEST_ID_CONTEXT); + List attributes = new ArrayList<>(); + attributes.add(attribute); + + // Set the attribute, doesn't throw any exception + CustomThreadAttributeUtil.setAttributes(attributes, conf); + + // Assert the gets + Assert.assertNull(CustomThreadAttributeUtil.getAttribute(attribute, conf)); + Assert.assertTrue(CustomThreadAttributeUtil.getAllAttributes(conf).isEmpty()); + + // Clear the attribute, doesn't throw any exception + CustomThreadAttributeUtil.clearAttributes(attributes, conf); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 0522ad19f5a8..0314c8c99f00 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1456,6 +1456,18 @@ public static enum Modify { public static final boolean DEFAULT_UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS = false; + /** + * Configs related to Custom Thread Attributes + */ + public static final String CUSTOM_THREAD_ATTRIBUTE_REQUEST_ID_CONTEXT_PREFIX = + "thread.context.propagate.requestId"; + + public static final String CUSTOM_THREAD_ATTRIBUTE_ENABLED_SUFFIX = ".enabled"; + + public static final String CUSTOM_THREAD_ATTRIBUTE_IMPLEMENTATION_SUFFIX = ".implementation"; + + public static final Boolean CUSTOM_THREAD_ATTRIBUTE_DEFAULT_ENABLED = false; + private HConstants() { // Can't be instantiated with this ctor. }