Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.Table;
Expand All @@ -76,6 +77,7 @@

import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
Expand All @@ -101,6 +103,7 @@
public final class BackupSystemTable implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class);
private static final int BATCH_SIZE = 1000;

static class WALItem {
String backupId;
Expand Down Expand Up @@ -414,7 +417,7 @@ public void writePathsPostBulkLoad(TableName tabName, byte[] region,
}
try (Table table = connection.getTable(bulkLoadTableName)) {
List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
table.put(puts);
executePartitionedBatches(table, puts);
LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
}
}
Expand Down Expand Up @@ -453,7 +456,7 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
lstDels.add(del);
LOG.debug("orig deleting the row: " + Bytes.toString(row));
}
table.delete(lstDels);
executePartitionedBatches(table, lstDels);
LOG.debug("deleted " + rows.size() + " original bulkload rows");
}
}
Expand Down Expand Up @@ -558,7 +561,7 @@ public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Pa
}
}
if (!puts.isEmpty()) {
table.put(puts);
executePartitionedBatches(table, puts);
}
}
}
Expand Down Expand Up @@ -918,7 +921,7 @@ public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Lon
puts.add(put);
}
try (Table table = connection.getTable(tableName)) {
table.put(puts);
executePartitionedBatches(table, puts);
}
}

Expand Down Expand Up @@ -1902,4 +1905,19 @@ private static void ensureTableEnabled(Admin admin, TableName tableName) throws
}
}
}

/**
* Executes the given operations in partitioned batches of size {@link #BATCH_SIZE}
*/
private static void executePartitionedBatches(Table table, List<? extends Row> operations)
throws IOException {
List<? extends List<? extends Row>> operationBatches = Lists.partition(operations, BATCH_SIZE);
for (List<? extends Row> batch : operationBatches) {
try {
table.batch(batch, new Object[batch.size()]);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}