Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.thrift;

import java.io.IOException;
Expand All @@ -26,10 +25,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.logging.Log;
Expand All @@ -41,7 +38,6 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MBeanUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.thrift.TException;

/**
* This class will coalesce increments from a thift server if
Expand All @@ -53,7 +49,6 @@
*
*/
public class IncrementCoalescer implements IncrementCoalescerMBean {

/**
* Used to identify a cell that will be incremented.
*
Expand Down Expand Up @@ -84,10 +79,6 @@ public byte[] getRowKey() {
return rowKey;
}

public void setRowKey(byte[] rowKey) {
this.rowKey = rowKey;
}

public byte[] getFamily() {
return family;
}
Expand Down Expand Up @@ -117,45 +108,37 @@ public int hashCode() {

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
FullyQualifiedRow other = (FullyQualifiedRow) obj;
if (!Arrays.equals(family, other.family)) return false;
if (!Arrays.equals(qualifier, other.qualifier)) return false;
if (!Arrays.equals(rowKey, other.rowKey)) return false;
if (!Arrays.equals(table, other.table)) return false;
return true;
}

}
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}

static class DaemonThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
FullyQualifiedRow other = (FullyQualifiedRow) obj;

DaemonThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "ICV-" + poolNumber.getAndIncrement() + "-thread-";
}
if (!Arrays.equals(family, other.family)) {
return false;
}
if (!Arrays.equals(qualifier, other.qualifier)) {
return false;
}
if (!Arrays.equals(rowKey, other.rowKey)) {
return false;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (!t.isDaemon()) t.setDaemon(true);
if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
return t;
return Arrays.equals(table, other.table);
}
}

private final AtomicLong failedIncrements = new AtomicLong();
private final AtomicLong successfulCoalescings = new AtomicLong();
private final AtomicLong totalIncrements = new AtomicLong();
private final ConcurrentMap<FullyQualifiedRow, Long> countersMap =
new ConcurrentHashMap<FullyQualifiedRow, Long>(100000, 0.75f, 1500);
new ConcurrentHashMap<>(100000, 0.75f, 1500);
private final ThreadPoolExecutor pool;
private final HBaseHandler handler;

Expand All @@ -167,22 +150,21 @@ public Thread newThread(Runnable r) {
public IncrementCoalescer(HBaseHandler hand) {
this.handler = hand;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
pool =
new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
Threads.newDaemonThreadFactory("IncrementCoalescer"));

MBeanUtil.registerMBean("thrift", "Thrift", this);
}

public boolean queueIncrement(TIncrement inc) throws TException {
public boolean queueIncrement(TIncrement inc) {
if (!canQueue()) {
failedIncrements.incrementAndGet();
return false;
}
return internalQueueTincrement(inc);
}

public boolean queueIncrements(List<TIncrement> incs) throws TException {
public boolean queueIncrements(List<TIncrement> incs) {
if (!canQueue()) {
failedIncrements.incrementAndGet();
return false;
Expand All @@ -191,23 +173,24 @@ public boolean queueIncrements(List<TIncrement> incs) throws TException {
for (TIncrement tinc : incs) {
internalQueueTincrement(tinc);
}
return true;

return true;
}

private boolean internalQueueTincrement(TIncrement inc) throws TException {
private boolean internalQueueTincrement(TIncrement inc) {
byte[][] famAndQf = KeyValue.parseColumn(inc.getColumn());
if (famAndQf.length != 2) return false;
if (famAndQf.length != 2) {
return false;
}

return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1],
inc.getAmmount());
}

private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam,
byte[] qual, long ammount) throws TException {
byte[] qual, long ammount) {
int countersMapSize = countersMap.size();


//Make sure that the number of threads is scaled.
dynamicallySetCoreSize(countersMapSize);

Expand All @@ -221,7 +204,7 @@ private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] f
Long value = countersMap.remove(key);
if (value == null) {
// There was nothing there, create a new value
value = Long.valueOf(currentAmount);
value = currentAmount;
} else {
value += currentAmount;
successfulCoalescings.incrementAndGet();
Expand Down Expand Up @@ -293,7 +276,7 @@ public Integer call() throws Exception {
/**
* This method samples the incoming requests and, if selected, will check if
* the corePoolSize should be changed.
* @param countersMapSize
* @param countersMapSize the size of the counters map
*/
private void dynamicallySetCoreSize(int countersMapSize) {
// Here we are using countersMapSize as a random number, meaning this
Expand All @@ -302,9 +285,10 @@ private void dynamicallySetCoreSize(int countersMapSize) {
return;
}
double currentRatio = (double) countersMapSize / (double) maxQueueSize;
int newValue = 1;
int newValue;

if (currentRatio < 0.1) {
// it's 1
newValue = 1;
} else if (currentRatio < 0.3) {
newValue = 2;
} else if (currentRatio < 0.5) {
Expand All @@ -316,6 +300,7 @@ private void dynamicallySetCoreSize(int countersMapSize) {
} else {
newValue = 22;
}

if (pool.getCorePoolSize() != newValue) {
pool.setCorePoolSize(newValue);
}
Expand Down Expand Up @@ -391,5 +376,4 @@ public long getTotalIncrements() {
public long getCountersMapSize() {
return countersMap.size();
}

}