Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.RateLimiter;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieHBaseIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -55,18 +56,23 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.joda.time.DateTime;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import scala.Tuple2;

Expand All @@ -84,13 +90,14 @@ public class SparkHoodieHBaseIndex<T extends HoodieRecordPayload> extends SparkH
private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
private static final int SLEEP_TIME_MILLISECONDS = 100;

private static final Logger LOG = LogManager.getLogger(SparkHoodieHBaseIndex.class);
private static Connection hbaseConnection = null;
private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
private float qpsFraction;
private int maxQpsPerRegionServer;
private long totalNumInserts;
private int numWriteStatusWithInserts;

/**
* multiPutBatchSize will be computed and re-set in updateLocation if
* {@link HoodieHBaseIndexConfig#HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP} is set to true.
Expand All @@ -109,7 +116,6 @@ public SparkHoodieHBaseIndex(HoodieWriteConfig config) {

private void init(HoodieWriteConfig config) {
this.multiPutBatchSize = config.getHbaseIndexGetBatchSize();
this.qpsFraction = config.getHbaseIndexQPSFraction();
this.maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer();
this.putBatchSizeCalculator = new HBasePutBatchSizeCalculator();
this.hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
Expand Down Expand Up @@ -163,7 +169,7 @@ private void addShutDownHook() {
*/
@Override
public void close() {
this.hBaseIndexQPSResourceAllocator.releaseQPSResources();
LOG.info("No resources to release from Hbase index");
}

private Get generateStatement(String key) throws IOException {
Expand All @@ -185,12 +191,14 @@ private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String comm
private Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>> locationTagFunction(
HoodieTableMetaClient metaClient) {

// `multiGetBatchSize` is intended to be a batch per 100ms. To create a rate limiter that measures
// operations per second, we need to multiply `multiGetBatchSize` by 10.
Integer multiGetBatchSize = config.getHbaseIndexGetBatchSize();
return (Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>>) (partitionNum,
hoodieRecordIterator) -> {

int multiGetBatchSize = config.getHbaseIndexGetBatchSize();
boolean updatePartitionPath = config.getHbaseIndexUpdatePartitionPath();

RateLimiter limiter = RateLimiter.create(multiGetBatchSize * 10, TimeUnit.SECONDS);
// Grab the global HBase connection
synchronized (SparkHoodieHBaseIndex.class) {
if (hbaseConnection == null || hbaseConnection.isClosed()) {
Expand All @@ -211,7 +219,7 @@ private Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>>
continue;
}
// get results for batch from Hbase
Result[] results = doGet(hTable, statements);
Result[] results = doGet(hTable, statements, limiter);
// clear statements to be GC'd
statements.clear();
for (Result result : results) {
Expand Down Expand Up @@ -262,9 +270,12 @@ private Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>>
};
}

private Result[] doGet(HTable hTable, List<Get> keys) throws IOException {
sleepForTime(SLEEP_TIME_MILLISECONDS);
return hTable.get(keys);
private Result[] doGet(HTable hTable, List<Get> keys, RateLimiter limiter) throws IOException {
if (keys.size() > 0) {
limiter.tryAcquire(keys.size());
return hTable.get(keys);
}
return new Result[keys.size()];
}

@Override
Expand All @@ -285,11 +296,21 @@ private Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>> updateL
hbaseConnection = getHBaseConnection();
}
}
final long startTimeForPutsTask = DateTime.now().getMillis();
LOG.info("startTimeForPutsTask for this task: " + startTimeForPutsTask);

try (BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS);
while (statusIterator.hasNext()) {
WriteStatus writeStatus = statusIterator.next();
List<Mutation> mutations = new ArrayList<>();
try {
long numOfInserts = writeStatus.getStat().getNumInserts();
LOG.info("Num of inserts in this WriteStatus: " + numOfInserts);
LOG.info("Total inserts in this job: " + this.totalNumInserts);
LOG.info("multiPutBatchSize for this job: " + this.multiPutBatchSize);
// Create a rate limiter that allows `multiPutBatchSize` operations per second
// Any calls beyond `multiPutBatchSize` within a second will be rate limited
for (HoodieRecord rec : writeStatus.getWrittenRecords()) {
if (!writeStatus.isErrored(rec.getKey())) {
Option<HoodieRecordLocation> loc = rec.getNewLocation();
Expand All @@ -312,17 +333,19 @@ private Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>> updateL
if (mutations.size() < multiPutBatchSize) {
continue;
}
doMutations(mutator, mutations);
doMutations(mutator, mutations, limiter);
}
// process remaining puts and deletes, if any
doMutations(mutator, mutations);
doMutations(mutator, mutations, limiter);
} catch (Exception e) {
Exception we = new Exception("Error updating index for " + writeStatus, e);
LOG.error(we);
writeStatus.setGlobalError(we);
}
writeStatusList.add(writeStatus);
}
final long endPutsTime = DateTime.now().getMillis();
LOG.info("hbase puts task time for this task: " + (endPutsTime - startTimeForPutsTask));
} catch (IOException e) {
throw new HoodieIndexException("Failed to Update Index locations because of exception with HBase Client", e);
}
Expand All @@ -333,67 +356,95 @@ private Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>> updateL
/**
* Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
*/
private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
private void doMutations(BufferedMutator mutator, List<Mutation> mutations, RateLimiter limiter) throws IOException {
if (mutations.isEmpty()) {
return;
}
// report number of operations to account per second with rate limiter.
// If #limiter.getRate() operations are acquired within 1 second, ratelimiter will limit the rest of calls
// for within that second
limiter.tryAcquire(mutations.size());
mutator.mutate(mutations);
mutator.flush();
mutations.clear();
sleepForTime(SLEEP_TIME_MILLISECONDS);
}

private static void sleepForTime(int sleepTimeMs) {
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException e) {
LOG.error("Sleep interrupted during throttling", e);
throw new RuntimeException(e);
public Map<String, Integer> mapFileWithInsertsToUniquePartition(JavaRDD<WriteStatus> writeStatusRDD) {
final Map<String, Integer> fileIdPartitionMap = new HashMap<>();
int partitionIndex = 0;
// Map each fileId that has inserts to a unique partition Id. This will be used while
// repartitioning RDD<WriteStatus>
final List<String> fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
.map(w -> w.getFileId()).collect();
for (final String fileId : fileIds) {
fileIdPartitionMap.put(fileId, partitionIndex++);
}
return fileIdPartitionMap;
}

@Override
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, HoodieEngineContext context,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, context);
LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> hoodieTable) {
final Option<Float> desiredQPSFraction = calculateQPSFraction(writeStatusRDD);
final Map<String, Integer> fileIdPartitionMap = mapFileWithInsertsToUniquePartition(writeStatusRDD);
JavaRDD<WriteStatus> partitionedRDD = this.numWriteStatusWithInserts == 0 ? writeStatusRDD :
writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
.partitionBy(new WriteStatusPartitioner(fileIdPartitionMap,
this.numWriteStatusWithInserts))
.map(w -> w._2());
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
JavaRDD<WriteStatus> writeStatusJavaRDD = partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(),
true);
// caching the index updated status RDD
writeStatusJavaRDD = writeStatusJavaRDD.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
// force trigger update location(hbase puts)
writeStatusJavaRDD.count();
this.hBaseIndexQPSResourceAllocator.releaseQPSResources();
return writeStatusJavaRDD;
}

private void setPutBatchSize(JavaRDD<WriteStatus> writeStatusRDD,
HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator, final HoodieEngineContext context) {
private Option<Float> calculateQPSFraction(JavaRDD<WriteStatus> writeStatusRDD) {
if (config.getHbaseIndexPutBatchSizeAutoCompute()) {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
SparkConf conf = jsc.getConf();
int maxExecutors = conf.getInt(DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME, 1);
if (conf.getBoolean(DEFAULT_SPARK_DYNAMIC_ALLOCATION_ENABLED_CONFIG_NAME, false)) {
maxExecutors =
Math.max(maxExecutors, conf.getInt(DEFAULT_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG_NAME, 1));
}

/*
* Each writeStatus represents status information from a write done in one of the IOHandles. If a writeStatus has
* any insert, it implies that the corresponding task contacts HBase for doing puts, since we only do puts for
* inserts from HBaseIndex.
Each writeStatus represents status information from a write done in one of the IOHandles.
If a writeStatus has any insert, it implies that the corresponding task contacts HBase for
doing puts, since we only do puts for inserts from HBaseIndex.
*/
final Tuple2<Long, Integer> numPutsParallelismTuple = getHBasePutAccessParallelism(writeStatusRDD);
final long numPuts = numPutsParallelismTuple._1;
final int hbasePutsParallelism = numPutsParallelismTuple._2;
final Tuple2<Long, Integer> numPutsParallelismTuple = getHBasePutAccessParallelism(writeStatusRDD);
this.totalNumInserts = numPutsParallelismTuple._1;
this.numWriteStatusWithInserts = numPutsParallelismTuple._2;
this.numRegionServersForTable = getNumRegionServersAliveForTable();
final float desiredQPSFraction =
hBaseIndexQPSResourceAllocator.calculateQPSFractionForPutsTime(numPuts, this.numRegionServersForTable);
final float desiredQPSFraction = this.hBaseIndexQPSResourceAllocator.calculateQPSFractionForPutsTime(
this.totalNumInserts, this.numRegionServersForTable);
LOG.info("Desired QPSFraction :" + desiredQPSFraction);
LOG.info("Number HBase puts :" + numPuts);
LOG.info("Hbase Puts Parallelism :" + hbasePutsParallelism);
final float availableQpsFraction =
hBaseIndexQPSResourceAllocator.acquireQPSResources(desiredQPSFraction, numPuts);
LOG.info("Number HBase puts :" + this.totalNumInserts);
LOG.info("Number of WriteStatus with inserts :" + numWriteStatusWithInserts);
return Option.of(desiredQPSFraction);
}
return Option.empty();
}

private void acquireQPSResourcesAndSetBatchSize(final Option<Float> desiredQPSFraction,
final JavaSparkContext jsc) {
if (config.getHbaseIndexPutBatchSizeAutoCompute()) {
SparkConf conf = jsc.getConf();
int maxExecutors = conf.getInt(DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME, 1);
if (conf.getBoolean(DEFAULT_SPARK_DYNAMIC_ALLOCATION_ENABLED_CONFIG_NAME, false)) {
maxExecutors = Math.max(maxExecutors, conf.getInt(
DEFAULT_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG_NAME, 1));
}
final float availableQpsFraction = this.hBaseIndexQPSResourceAllocator
.acquireQPSResources(desiredQPSFraction.get(), this.totalNumInserts);
LOG.info("Allocated QPS Fraction :" + availableQpsFraction);
multiPutBatchSize = putBatchSizeCalculator.getBatchSize(numRegionServersForTable, maxQpsPerRegionServer,
hbasePutsParallelism, maxExecutors, SLEEP_TIME_MILLISECONDS, availableQpsFraction);
multiPutBatchSize = putBatchSizeCalculator
.getBatchSize(
numRegionServersForTable,
maxQpsPerRegionServer,
numWriteStatusWithInserts,
maxExecutors,
availableQpsFraction);
LOG.info("multiPutBatchSize :" + multiPutBatchSize);
}
}
Expand All @@ -406,7 +457,6 @@ public Tuple2<Long, Integer> getHBasePutAccessParallelism(final JavaRDD<WriteSta

public static class HBasePutBatchSizeCalculator implements Serializable {

private static final int MILLI_SECONDS_IN_A_SECOND = 1000;
private static final Logger LOG = LogManager.getLogger(HBasePutBatchSizeCalculator.class);

/**
Expand Down Expand Up @@ -441,22 +491,26 @@ public static class HBasePutBatchSizeCalculator implements Serializable {
* </li>
* </p>
*/
public int getBatchSize(int numRegionServersForTable, int maxQpsPerRegionServer, int numTasksDuringPut,
int maxExecutors, int sleepTimeMs, float qpsFraction) {
int maxReqPerSec = (int) (qpsFraction * numRegionServersForTable * maxQpsPerRegionServer);
int maxParallelPuts = Math.max(1, Math.min(numTasksDuringPut, maxExecutors));
int maxReqsSentPerTaskPerSec = MILLI_SECONDS_IN_A_SECOND / sleepTimeMs;
int multiPutBatchSize = Math.max(1, maxReqPerSec / (maxParallelPuts * maxReqsSentPerTaskPerSec));
public int getBatchSize(int numRegionServersForTable, int maxQpsPerRegionServer,
int numTasksDuringPut, int maxExecutors, float qpsFraction) {
int numRSAlive = numRegionServersForTable;
int maxReqPerSec = getMaxReqPerSec(numRSAlive, maxQpsPerRegionServer, qpsFraction);
int numTasks = numTasksDuringPut;
int maxParallelPutsTask = Math.max(1, Math.min(numTasks, maxExecutors));
int multiPutBatchSizePerSecPerTask = Math.max(1, (int) Math.ceil(maxReqPerSec / maxParallelPutsTask));
LOG.info("HbaseIndexThrottling: qpsFraction :" + qpsFraction);
LOG.info("HbaseIndexThrottling: numRSAlive :" + numRegionServersForTable);
LOG.info("HbaseIndexThrottling: numRSAlive :" + numRSAlive);
LOG.info("HbaseIndexThrottling: maxReqPerSec :" + maxReqPerSec);
LOG.info("HbaseIndexThrottling: numTasks :" + numTasksDuringPut);
LOG.info("HbaseIndexThrottling: numTasks :" + numTasks);
LOG.info("HbaseIndexThrottling: maxExecutors :" + maxExecutors);
LOG.info("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPuts);
LOG.info("HbaseIndexThrottling: maxReqsSentPerTaskPerSec :" + maxReqsSentPerTaskPerSec);
LOG.info("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPutsTask);
LOG.info("HbaseIndexThrottling: numRegionServersForTable :" + numRegionServersForTable);
LOG.info("HbaseIndexThrottling: multiPutBatchSize :" + multiPutBatchSize);
return multiPutBatchSize;
LOG.info("HbaseIndexThrottling: multiPutBatchSizePerSecPerTask :" + multiPutBatchSizePerSecPerTask);
return multiPutBatchSizePerSecPerTask;
}

public int getMaxReqPerSec(int numRegionServersForTable, int maxQpsPerRegionServer, float qpsFraction) {
return (int) (qpsFraction * numRegionServersForTable * maxQpsPerRegionServer);
}
}

Expand Down Expand Up @@ -510,4 +564,37 @@ public boolean isImplicitWithStorage() {
public void setHbaseConnection(Connection hbaseConnection) {
SparkHoodieHBaseIndex.hbaseConnection = hbaseConnection;
}

/**
* Partitions each WriteStatus with inserts into a unique single partition. WriteStatus without inserts will be
* assigned to random partitions. This partitioner will be useful to utilize max parallelism with spark operations
* that are based on inserts in each WriteStatus.
*/
public static class WriteStatusPartitioner extends Partitioner {
private int totalPartitions;
final Map<String, Integer> fileIdPartitionMap;

public WriteStatusPartitioner(final Map<String, Integer> fileIdPartitionMap, final int totalPartitions) {
this.totalPartitions = totalPartitions;
this.fileIdPartitionMap = fileIdPartitionMap;
}

@Override
public int numPartitions() {
return this.totalPartitions;
}

@Override
public int getPartition(Object key) {
final String fileId = (String) key;
if (!fileIdPartitionMap.containsKey(fileId)) {
LOG.info("This writestatus(fileId: " + fileId + ") is not mapped because it doesn't have any inserts. "
+ "In this case, we can assign a random partition to this WriteStatus.");
// Assign random spark partition for the `WriteStatus` that has no inserts. For a spark operation that depends
// on number of inserts, there won't be any performance penalty in packing these WriteStatus'es together.
return Math.abs(fileId.hashCode()) % totalPartitions;
}
return fileIdPartitionMap.get(fileId);
}
}
}
Loading