Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,16 @@ public enum OperationStatusCode {
"hbase.regionserver.slowlog.systable.enabled";
public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;

/**
* Number of rows in a batch operation above which a warning will be logged.
*/
public static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";

/**
* Default value of {@link #BATCH_ROWS_THRESHOLD_NAME}
*/
public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the move? HConstants is often considered an anti-pattern; better to have constants beside where they are used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want RsRpcServices as well as ReplicationSink both to use these constants. Would you still recommend having duplicate copies?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Two very disparate contexts. This is probably only case for HConstants.

private HConstants() {
// Can't be instantiated with this ctor.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;

/**
* Number of rows in a batch operation above which a warning will be logged.
*/
static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
/**
* Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
*/
static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;

/*
* Whether to reject rows with size > threshold defined by
* {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
Expand Down Expand Up @@ -1265,7 +1256,8 @@ public RSRpcServices(final HRegionServer rs) throws IOException {
final Configuration conf = rs.getConfiguration();
this.ld = ld;
regionServer = rs;
rowSizeWarnThreshold = conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
rowSizeWarnThreshold = conf.getInt(
HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
rejectRowsWithSizeOverThreshold =
conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
@Override
public void startReplicationService() throws IOException {
this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf, this.server);
this.replicationSink = new ReplicationSink(this.conf);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -37,7 +38,6 @@
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
Expand All @@ -54,6 +54,7 @@
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -93,15 +94,21 @@ public class ReplicationSink {
private SourceFSConfigurationProvider provider;
private WALEntrySinkFilter walEntrySinkFilter;

/**
* Row size threshold for multi requests above which a warning is logged
*/
private final int rowSizeWarnThreshold;

/**
* Create a sink for replication
* @param conf conf object
* @param stopper boolean to tell this thread to stop
* @throws IOException thrown when HDFS goes bad or bad file name
*/
public ReplicationSink(Configuration conf, Stoppable stopper)
public ReplicationSink(Configuration conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the Stoppable unused? Usually its a chain for a Service to pull on when it meets a condition it can't deal with... one that is so bad it wants to stop the process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree for stoppable usage, but here it is anyways unused :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

throws IOException {
this.conf = HBaseConfiguration.create(conf);
rowSizeWarnThreshold = conf.getInt(
HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
decorateConf();
this.metrics = new MetricsSink();
this.walEntrySinkFilter = setupWALEntrySinkFilter();
Expand Down Expand Up @@ -211,11 +218,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
// Map of table name Vs list of pair of family and list of
// hfile paths from its namespace
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
bulkLoadsPerClusters.get(bld.getClusterIdsList());
if (bulkLoadHFileMap == null) {
bulkLoadHFileMap = new HashMap<>();
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
}
bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
}
} else {
Expand Down Expand Up @@ -250,7 +253,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
if (!rowMap.isEmpty()) {
LOG.debug("Started replicating mutations.");
for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
batch(entry.getKey(), entry.getValue().values());
batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
}
LOG.debug("Finished replicating mutations.");
}
Expand Down Expand Up @@ -366,16 +369,8 @@ private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
*/
private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1,
K2 key2, V value) {
Map<K2,List<V>> innerMap = map.get(key1);
if (innerMap == null) {
innerMap = new HashMap<>();
map.put(key1, innerMap);
}
List<V> values = innerMap.get(key2);
if (values == null) {
values = new ArrayList<>();
innerMap.put(key2, values);
}
Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>());
List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>());
values.add(value);
return values;
}
Expand Down Expand Up @@ -403,13 +398,24 @@ public void stopReplicationSinkServices() {
* Do the changes and handle the pool
* @param tableName table to insert into
* @param allRows list of actions
* @param batchRowSizeThreshold rowSize threshold for batch mutation
*/
private void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
throws IOException {
if (allRows.isEmpty()) {
return;
}
AsyncTable<?> table = getConnection().getTable(tableName);
List<Future<?>> futures = allRows.stream().map(table::batchAll).collect(Collectors.toList());
List<Future<?>> futures = new ArrayList<>();
for (List<Row> rows : allRows) {
List<List<Row>> batchRows;
if (rows.size() > batchRowSizeThreshold) {
batchRows = Lists.partition(rows, batchRowSizeThreshold);
} else {
batchRows = Collections.singletonList(rows);
}
futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList()));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the edits that are beyond the limit, they get handled in next batch?

Copy link
Contributor Author

@virajjasani virajjasani Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, they are handled in next batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UT testLargeEditsPutDelete() as part of this PR will always ensure that this is handled.

for (Future<?> future : futures) {
try {
FutureUtils.get(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.testclassification.LargeTests;
Expand Down Expand Up @@ -80,8 +81,8 @@ public void setupTest() throws Exception {
final TableName tableName = TableName.valueOf("tableName");
TEST_UTIL = new HBaseTestingUtility();
CONF = TEST_UTIL.getConfiguration();
THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME,
RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT);
THRESHOLD = CONF.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME,
HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
CONF.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp);
TEST_UTIL.startMiniCluster();
TEST_UTIL.createTable(tableName, TEST_FAM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.security.SecureRandom;
import java.util.ArrayList;
Expand Down Expand Up @@ -55,7 +54,7 @@
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand All @@ -78,7 +77,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;

@Category({ReplicationTests.class, MediumTests.class})
@Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationSink {

@ClassRule
Expand Down Expand Up @@ -127,9 +126,8 @@ public void stop(String why) {
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());

TEST_UTIL.startMiniCluster(3);
SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()));
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
Expand Down Expand Up @@ -202,6 +200,40 @@ public void testMixedPutDelete() throws Exception {
assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
}

@Test
public void testLargeEditsPutDelete() throws Exception {
List<WALEntry> entries = new ArrayList<>();
List<Cell> cells = new ArrayList<>();
for (int i = 0; i < 5510; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
baseNamespaceDir, hfileArchiveDir);

ResultScanner resultScanner = table1.getScanner(new Scan());
int totalRows = 0;
while (resultScanner.next() != null) {
totalRows++;
}
assertEquals(5510, totalRows);

entries = new ArrayList<>();
cells = new ArrayList<>();
for (int i = 0; i < 11000; i++) {
entries.add(
createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn,
cells));
}
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
baseNamespaceDir, hfileArchiveDir);
resultScanner = table1.getScanner(new Scan());
totalRows = 0;
while (resultScanner.next() != null) {
totalRows++;
}
assertEquals(5500, totalRows);
}

/**
* Insert to 2 different tables
* @throws Exception
Expand All @@ -220,7 +252,11 @@ public void testMixedPutTables() throws Exception {
Scan scan = new Scan();
ResultScanner scanRes = table2.getScanner(scan);
for(Result res : scanRes) {
assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
assertEquals(0, Bytes.toInt(res.getRow()) % 2);
}
scanRes = table1.getScanner(scan);
for(Result res : scanRes) {
assertEquals(1, Bytes.toInt(res.getRow()) % 2);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testWALEntryFilter() throws IOException {
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL,
DevNullAsyncClusterConnection.class, AsyncClusterConnection.class);
ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
ReplicationSink sink = new ReplicationSink(conf);
// Create some dumb walentries.
List<AdminProtos.WALEntry> entries = new ArrayList<>();
AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
Expand Down