Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -793,8 +793,14 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co
throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
+ MAX_FLUSH_PER_CHANGES);
}
this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
DEFAULT_ROWLOCK_WAIT_DURATION);
int tmpRowLockDuration = conf.getInt("hbase.rowlock.wait.duration",
DEFAULT_ROWLOCK_WAIT_DURATION);
if (tmpRowLockDuration <= 0) {
LOG.info("Found hbase.rowlock.wait.duration set to {}. values <= 0 will cause all row " +
"locking to fail. Treating it as 1ms to avoid region failure.", tmpRowLockDuration);
tmpRowLockDuration = 1;
}
this.rowLockWaitDuration = tmpRowLockDuration;

this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
this.htableDescriptor = htd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6454,6 +6454,90 @@ public Void call() throws Exception {
CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
}

@Test
public void testBatchMutateWithZeroRowLockWait() throws Exception {
final byte[] a = Bytes.toBytes("a");
final byte[] b = Bytes.toBytes("b");
final byte[] c = Bytes.toBytes("c"); // exclusive

Configuration conf = new Configuration(CONF);
conf.setInt("hbase.rowlock.wait.duration", 0);
final RegionInfo hri =
RegionInfoBuilder.newBuilder(tableName).setStartKey(a).setEndKey(c).build();
final TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1)).build();
region = HRegion.createHRegion(hri, TEST_UTIL.getDataTestDir(), conf, htd, TEST_UTIL.createWal(conf, TEST_UTIL.getDataTestDirOnTestFS(method + ".log"), hri));

Mutation[] mutations = new Mutation[] {
new Put(a)
.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(a)
.setFamily(fam1)
.setTimestamp(HConstants.LATEST_TIMESTAMP)
.setType(Cell.Type.Put)
.build()),
new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(b)
.setFamily(fam1)
.setTimestamp(HConstants.LATEST_TIMESTAMP)
.setType(Cell.Type.Put)
.build())
};

OperationStatus[] status = region.batchMutate(mutations);
assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
assertEquals(OperationStatusCode.SUCCESS, status[1].getOperationStatusCode());


// test with a row lock held for a long time
final CountDownLatch obtainedRowLock = new CountDownLatch(1);
ExecutorService exec = Executors.newFixedThreadPool(2);
Future<Void> f1 = exec.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
LOG.info("Acquiring row lock");
RowLock rl = region.getRowLock(b);
obtainedRowLock.countDown();
LOG.info("Waiting for 5 seconds before releasing lock");
Threads.sleep(5000);
LOG.info("Releasing row lock");
rl.release();
return null;
}
});
obtainedRowLock.await(30, TimeUnit.SECONDS);

Future<Void> f2 = exec.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
Mutation[] mutations = new Mutation[] {
new Put(a).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(a)
.setFamily(fam1)
.setTimestamp(HConstants.LATEST_TIMESTAMP)
.setType(Cell.Type.Put)
.build()),
new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(b)
.setFamily(fam1)
.setTimestamp(HConstants.LATEST_TIMESTAMP)
.setType(Cell.Type.Put)
.build()),
};
// when handling row b we are going to spin on the failure to get the row lock
// until the lock above is released, but we will still succeed so long as that
// takes less time then the test time out.
OperationStatus[] status = region.batchMutate(mutations);
assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
assertEquals(OperationStatusCode.SUCCESS, status[1].getOperationStatusCode());
return null;
}
});

f1.get();
f2.get();
}

@Test
public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
region = initHRegion(tableName, method, CONF, fam1);
Expand Down