From 76a3c542ba54f5b128a38a6e359871021ed40c57 Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Thu, 18 Mar 2021 20:43:06 +0800 Subject: [PATCH] [HUDI-1705] Flush as per data bucket for mini-batch write Detects the buffer size for each data bucket before flushing. So that we avoid flushing data buckets with few records. --- .../apache/hudi/sink/StreamWriteFunction.java | 112 +++++++++++++----- .../hudi/sink/TestWriteCopyOnWrite.java | 3 +- .../utils/StreamWriteFunctionWrapper.java | 2 +- .../utils/source/ContinuousFileSource.java | 18 +-- 4 files changed, 88 insertions(+), 47 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index a67cdae96b73a..53104fbc0270a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.ObjectSizeCalculator; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; @@ -47,6 +48,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -100,9 +102,9 @@ public class StreamWriteFunction private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class); /** - * Write buffer for a checkpoint. + * Write buffer as buckets for a checkpoint. The key is bucket ID. */ - private transient Map> buffer; + private transient Map buckets; /** * The buffer lock to control data buffering/flushing. @@ -146,11 +148,6 @@ public class StreamWriteFunction */ private transient OperatorEventGateway eventGateway; - /** - * The detector that tells if to flush the data as mini-batch. - */ - private transient BufferSizeDetector detector; - /** * Constructs a StreamingSinkFunction. * @@ -163,7 +160,6 @@ public StreamWriteFunction(Configuration config) { @Override public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); - this.detector = new BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)); initBuffer(); initWriteClient(); initWriteFunction(); @@ -182,7 +178,7 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) { // it would check the validity. this.onCheckpointing = true; // wait for the buffer data flush out and request a new instant - flushBuffer(true, false); + flushRemaining(false); // signal the task thread to start buffering addToBufferCondition.signal(); } finally { @@ -198,8 +194,7 @@ public void processElement(I value, KeyedProcessFunction.Context ctx, C if (onCheckpointing) { addToBufferCondition.await(); } - flushBufferOnCondition(value); - putDataIntoBuffer(value); + bufferRecord(value); } finally { bufferLock.unlock(); } @@ -221,7 +216,7 @@ public void notifyCheckpointComplete(long checkpointId) { * End input action for batch source. */ public void endInput() { - flushBuffer(true, true); + flushRemaining(true); this.writeClient.cleanHandles(); } @@ -231,8 +226,12 @@ public void endInput() { @VisibleForTesting @SuppressWarnings("rawtypes") - public Map> getBuffer() { - return buffer; + public Map> getDataBuffer() { + Map> ret = new HashMap<>(); + for (Map.Entry entry : buckets.entrySet()) { + ret.put(entry.getKey(), entry.getValue().records); + } + return ret; } @VisibleForTesting @@ -250,7 +249,7 @@ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { // ------------------------------------------------------------------------- private void initBuffer() { - this.buffer = new LinkedHashMap<>(); + this.buckets = new LinkedHashMap<>(); this.bufferLock = new ReentrantLock(); this.addToBufferCondition = this.bufferLock.newCondition(); } @@ -278,6 +277,24 @@ private void initWriteFunction() { } } + /** + * Data bucket. + */ + private static class DataBucket { + private final List records; + private final BufferSizeDetector detector; + + private DataBucket(Double batchSize) { + this.records = new ArrayList<>(); + this.detector = new BufferSizeDetector(batchSize); + } + + public void reset() { + this.records.clear(); + this.detector.reset(); + } + } + /** * Tool to detect if to flush out the existing buffer. * Sampling the record to compute the size with 0.01 percentage. @@ -314,32 +331,62 @@ void reset() { } } - private void putDataIntoBuffer(I value) { + /** + * Returns the bucket ID with the given value {@code value}. + */ + private String getBucketID(I value) { HoodieRecord record = (HoodieRecord) value; final String fileId = record.getCurrentLocation().getFileId(); - final String key = StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId); - if (!this.buffer.containsKey(key)) { - this.buffer.put(key, new ArrayList<>()); - } - this.buffer.get(key).add(record); + return StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId); } /** - * Flush the data buffer if the buffer size is greater than + * Buffers the given record. + * + *

Flush the data bucket first if the bucket records size is greater than * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}. * * @param value HoodieRecord */ - private void flushBufferOnCondition(I value) { - boolean needFlush = this.detector.detect(value); + private void bufferRecord(I value) { + final String bucketID = getBucketID(value); + + DataBucket bucket = this.buckets.computeIfAbsent(bucketID, + k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE))); + boolean needFlush = bucket.detector.detect(value); if (needFlush) { - flushBuffer(false, false); - this.detector.reset(); + flushBucket(bucket); + bucket.reset(); } + bucket.records.add((HoodieRecord) value); + } + + @SuppressWarnings("unchecked, rawtypes") + private void flushBucket(DataBucket bucket) { + this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE)); + if (this.currentInstant == null) { + // in case there are empty checkpoints that has no input data + LOG.info("No inflight instant when flushing data, cancel."); + return; + } + List records = bucket.records; + ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); + if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); + } + final List writeStatus = new ArrayList<>(writeFunction.apply(records, currentInstant)); + final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder() + .taskID(taskID) + .instantTime(currentInstant) + .writeStatus(writeStatus) + .isLastBatch(false) + .isEndInput(false) + .build(); + this.eventGateway.sendEventToCoordinator(event); } @SuppressWarnings("unchecked, rawtypes") - private void flushBuffer(boolean isLastBatch, boolean isEndInput) { + private void flushRemaining(boolean isEndInput) { this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE)); if (this.currentInstant == null) { // in case there are empty checkpoints that has no input data @@ -347,12 +394,13 @@ private void flushBuffer(boolean isLastBatch, boolean isEndInput) { return; } final List writeStatus; - if (buffer.size() > 0) { + if (buckets.size() > 0) { writeStatus = new ArrayList<>(); - this.buffer.values() + this.buckets.values() // The records are partitioned by the bucket ID and each batch sent to // the writer belongs to one bucket. - .forEach(records -> { + .forEach(bucket -> { + List records = bucket.records; if (records.size() > 0) { if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); @@ -368,11 +416,11 @@ private void flushBuffer(boolean isLastBatch, boolean isEndInput) { .taskID(taskID) .instantTime(currentInstant) .writeStatus(writeStatus) - .isLastBatch(isLastBatch) + .isLastBatch(true) .isEndInput(isEndInput) .build(); this.eventGateway.sendEventToCoordinator(event); - this.buffer.clear(); + this.buckets.clear(); this.currentInstant = ""; } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 1167779ffaf60..a98bd3d3804a8 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -399,6 +399,7 @@ public void testInsertWithMiniBatches() throws Exception { // this triggers the data write and event send funcWrapper.checkpointFunction(1); + dataBuffer = funcWrapper.getDataBuffer(); assertThat("All data should be flushed out", dataBuffer.size(), is(0)); final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first @@ -430,10 +431,8 @@ public void testInsertWithMiniBatches() throws Exception { final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first final OperatorEvent event4 = funcWrapper.getNextEvent(); - final OperatorEvent event5 = funcWrapper.getNextEvent(); funcWrapper.getCoordinator().handleEventFromOperator(0, event3); funcWrapper.getCoordinator().handleEventFromOperator(0, event4); - funcWrapper.getCoordinator().handleEventFromOperator(0, event5); funcWrapper.checkpointComplete(2); // Same the original base file content. diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index b80ac72b5c4a2..783f78599fbe0 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -137,7 +137,7 @@ public OperatorEvent getNextEvent() { } public Map> getDataBuffer() { - return this.writeFunction.getBuffer(); + return this.writeFunction.getDataBuffer(); } @SuppressWarnings("rawtypes") diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java index 2f759b870b9d2..992495642e517 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java @@ -33,11 +33,11 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import java.io.BufferedReader; -import java.io.FileReader; +import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -151,15 +151,9 @@ public void cancel() { } private void loadDataBuffer() { - this.dataBuffer = new ArrayList<>(); - try (BufferedReader reader = - new BufferedReader(new FileReader(this.path.toString()))) { - String line = reader.readLine(); - while (line != null) { - this.dataBuffer.add(line); - // read next line - line = reader.readLine(); - } + try { + new File(this.path.toString()).exists(); + this.dataBuffer = Files.readAllLines(Paths.get(this.path.toUri())); } catch (IOException e) { throw new RuntimeException("Read file " + this.path + " error", e); }