Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -323,35 +323,60 @@ public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService po

this.id = COUNTER.incrementAndGet();

this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
if (configuredPauseForCQTBE < pause) {
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
+ ", will use " + pause + " instead.");
this.pauseForCQTBE = pause;
} else {
this.pauseForCQTBE = configuredPauseForCQTBE;
}
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
ConnectionConfiguration connConf =
hc.getConfiguration() == conf
? hc.getConnectionConfiguration()
// Slow: parse conf in ConnectionConfiguration constructor
: new ConnectionConfiguration(conf);
if (connConf == null) {
// Slow: parse conf in ConnectionConfiguration constructor
connConf = new ConnectionConfiguration(conf);
}

this.pause = connConf.getPause();
this.pauseForCQTBE = connConf.getPauseForCQTBE();

this.numTries = connConf.getRetriesNumber();
this.rpcTimeout = rpcTimeout;
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);

this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
this.operationTimeout = connConf.getOperationTimeout();

// Parse config once and reuse config values of hc's AsyncProcess in AsyncProcess for put
// Can be null when constructing hc's AsyncProcess or it's not reusable
AsyncProcess globalAsyncProcess = hc.getConfiguration() == conf ? hc.getAsyncProcess() : null;

this.primaryCallTimeoutMicroseconds =
globalAsyncProcess == null
? conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000)
: globalAsyncProcess.primaryCallTimeoutMicroseconds;

this.maxTotalConcurrentTasks =
globalAsyncProcess == null
? conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)
: globalAsyncProcess.maxTotalConcurrentTasks;
this.maxConcurrentTasksPerServer =
globalAsyncProcess == null
? conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS)
: globalAsyncProcess.maxConcurrentTasksPerServer;
this.maxConcurrentTasksPerRegion =
globalAsyncProcess == null
? conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS)
: globalAsyncProcess.maxConcurrentTasksPerRegion;
this.maxHeapSizePerRequest =
globalAsyncProcess == null
? conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE)
: globalAsyncProcess.maxHeapSizePerRequest;
this.maxHeapSizeSubmit =
globalAsyncProcess == null
? conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE)
: globalAsyncProcess.maxHeapSizeSubmit;
this.startLogErrorsCnt =
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
globalAsyncProcess == null
? conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT)
: globalAsyncProcess.startLogErrorsCnt;

if (this.maxTotalConcurrentTasks <= 0) {
throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
Expand Down Expand Up @@ -387,11 +412,16 @@ public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService po

this.rpcCallerFactory = rpcCaller;
this.rpcFactory = rpcFactory;
this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
this.logBatchErrorDetails =
globalAsyncProcess == null
? conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false)
: globalAsyncProcess.logBatchErrorDetails;

this.thresholdToLogUndoneTaskDetails =
conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
globalAsyncProcess == null
? conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS)
: globalAsyncProcess.thresholdToLogUndoneTaskDetails;
}

public void setRpcTimeout(int rpcTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
Expand Down Expand Up @@ -112,32 +111,32 @@ public class BufferedMutatorImpl implements BufferedMutator {
this.pool = params.getPool();
this.listener = params.getListener();

ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
ConnectionConfiguration connConf = conn.getConnectionConfiguration();
if (connConf == null) {
// Slow: parse conf in ConnectionConfiguration constructor
connConf = new ConnectionConfiguration(conf);
}
this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
params.getWriteBufferSize() : tableConf.getWriteBufferSize();
params.getWriteBufferSize() : connConf.getWriteBufferSize();

// Set via the setter because it does value validation and starts/stops the TimerTask
long newWriteBufferPeriodicFlushTimeoutMs =
params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET
? params.getWriteBufferPeriodicFlushTimeoutMs()
: tableConf.getWriteBufferPeriodicFlushTimeoutMs();
: connConf.getWriteBufferPeriodicFlushTimeoutMs();
long newWriteBufferPeriodicFlushTimerTickMs =
params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET
? params.getWriteBufferPeriodicFlushTimerTickMs()
: tableConf.getWriteBufferPeriodicFlushTimerTickMs();
: connConf.getWriteBufferPeriodicFlushTimerTickMs();
this.setWriteBufferPeriodicFlush(
newWriteBufferPeriodicFlushTimeoutMs,
newWriteBufferPeriodicFlushTimerTickMs);

this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();

this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.operationTimeout = conn.getConfiguration().getInt(
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
params.getMaxKeyValueSize() : connConf.getMaxKeyValueSize();

this.writeRpcTimeout = connConf.getWriteRpcTimeout();
this.operationTimeout = connConf.getOperationTimeout();
// puts need to track errors globally due to how the APIs currently work.
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
package org.apache.hadoop.hbase.client;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
Expand All @@ -26,6 +28,7 @@
*/
@InterfaceAudience.Private
public class ConnectionConfiguration {
static final Log LOG = LogFactory.getLog(ConnectionConfiguration.class);

public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer";
public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
Expand All @@ -50,6 +53,10 @@ public class ConnectionConfiguration {
private final int metaReplicaCallTimeoutMicroSecondScan;
private final int retries;
private final int maxKeyValueSize;
private final int readRpcTimeout;
private final int writeRpcTimeout;
private final long pause;
private final long pauseForCQTBE;

/**
* Constructor
Expand Down Expand Up @@ -90,9 +97,28 @@ public class ConnectionConfiguration {
HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT);

this.retries = conf.getInt(
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);

this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);

this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));

this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));

this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
if (configuredPauseForCQTBE < pause) {
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
+ ", will use " + pause + " instead.");
this.pauseForCQTBE = pause;
} else {
this.pauseForCQTBE = configuredPauseForCQTBE;
}
}

/**
Expand All @@ -115,6 +141,10 @@ protected ConnectionConfiguration() {
HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.pause = HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
this.pauseForCQTBE = HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
}

public long getWriteBufferSize() {
Expand Down Expand Up @@ -164,4 +194,20 @@ public int getMaxKeyValueSize() {
public long getScannerMaxResultSize() {
return scannerMaxResultSize;
}

public int getReadRpcTimeout() {
return readRpcTimeout;
}

public int getWriteRpcTimeout() {
return writeRpcTimeout;
}

public long getPause() {
return pause;
}

public long getPauseForCQTBE() {
return pauseForCQTBE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -669,17 +669,8 @@ static class HConnectionImplementation implements ClusterConnection, Closeable {
this.managed = managed;
this.connectionConfig = new ConnectionConfiguration(conf);
this.closed = false;
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
if (configuredPauseForCQTBE < pause) {
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
+ ", will use " + pause + " instead.");
this.pauseForCQTBE = pause;
} else {
this.pauseForCQTBE = configuredPauseForCQTBE;
}
this.pause = connectionConfig.getPause();
this.pauseForCQTBE = connectionConfig.getPauseForCQTBE();
this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
HConstants.DEFAULT_USE_META_REPLICAS);
this.metaReplicaCallTimeoutScanInMicroSecond =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,8 @@ private void finishSetup() throws IOException {
}
this.operationTimeout = tableName.isSystemTable() ?
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
if (this.rpcCallerFactory == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,11 @@ public static ClusterConnection getMockedConnectionAndDecorate(final Configurati
}
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
Mockito.when(c.getAsyncProcess()).thenReturn(
new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT)));
AsyncProcess asyncProcess =
new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess);
Mockito.doNothing().when(c).incCount();
Mockito.doNothing().when(c).decCount();
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
Expand Down