Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 80 additions & 32 deletions hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
Expand All @@ -47,6 +48,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -100,9 +102,9 @@ public class StreamWriteFunction<K, I, O>
private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class);

/**
* Write buffer for a checkpoint.
* Write buffer as buckets for a checkpoint. The key is bucket ID.
*/
private transient Map<String, List<HoodieRecord>> buffer;
private transient Map<String, DataBucket> buckets;

/**
* The buffer lock to control data buffering/flushing.
Expand Down Expand Up @@ -146,11 +148,6 @@ public class StreamWriteFunction<K, I, O>
*/
private transient OperatorEventGateway eventGateway;

/**
* The detector that tells if to flush the data as mini-batch.
*/
private transient BufferSizeDetector detector;

/**
* Constructs a StreamingSinkFunction.
*
Expand All @@ -163,7 +160,6 @@ public StreamWriteFunction(Configuration config) {
@Override
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.detector = new BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE));
initBuffer();
initWriteClient();
initWriteFunction();
Expand All @@ -182,7 +178,7 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
// it would check the validity.
this.onCheckpointing = true;
// wait for the buffer data flush out and request a new instant
flushBuffer(true, false);
flushRemaining(false);
// signal the task thread to start buffering
addToBufferCondition.signal();
} finally {
Expand All @@ -198,8 +194,7 @@ public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, C
if (onCheckpointing) {
addToBufferCondition.await();
}
flushBufferOnCondition(value);
putDataIntoBuffer(value);
bufferRecord(value);
} finally {
bufferLock.unlock();
}
Expand All @@ -221,7 +216,7 @@ public void notifyCheckpointComplete(long checkpointId) {
* End input action for batch source.
*/
public void endInput() {
flushBuffer(true, true);
flushRemaining(true);
this.writeClient.cleanHandles();
}

Expand All @@ -231,8 +226,12 @@ public void endInput() {

@VisibleForTesting
@SuppressWarnings("rawtypes")
public Map<String, List<HoodieRecord>> getBuffer() {
return buffer;
public Map<String, List<HoodieRecord>> getDataBuffer() {
Map<String, List<HoodieRecord>> ret = new HashMap<>();
for (Map.Entry<String, DataBucket> entry : buckets.entrySet()) {
ret.put(entry.getKey(), entry.getValue().records);
}
return ret;
}

@VisibleForTesting
Expand All @@ -250,7 +249,7 @@ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
// -------------------------------------------------------------------------

private void initBuffer() {
this.buffer = new LinkedHashMap<>();
this.buckets = new LinkedHashMap<>();
this.bufferLock = new ReentrantLock();
this.addToBufferCondition = this.bufferLock.newCondition();
}
Expand Down Expand Up @@ -278,6 +277,24 @@ private void initWriteFunction() {
}
}

/**
* Data bucket.
*/
private static class DataBucket {
private final List<HoodieRecord> records;
private final BufferSizeDetector detector;

private DataBucket(Double batchSize) {
this.records = new ArrayList<>();
this.detector = new BufferSizeDetector(batchSize);
}

public void reset() {
this.records.clear();
this.detector.reset();
}
}

/**
* Tool to detect if to flush out the existing buffer.
* Sampling the record to compute the size with 0.01 percentage.
Expand Down Expand Up @@ -314,45 +331,76 @@ void reset() {
}
}

private void putDataIntoBuffer(I value) {
/**
* Returns the bucket ID with the given value {@code value}.
*/
private String getBucketID(I value) {
HoodieRecord<?> record = (HoodieRecord<?>) value;
final String fileId = record.getCurrentLocation().getFileId();
final String key = StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
if (!this.buffer.containsKey(key)) {
this.buffer.put(key, new ArrayList<>());
}
this.buffer.get(key).add(record);
return StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
}

/**
* Flush the data buffer if the buffer size is greater than
* Buffers the given record.
*
* <p>Flush the data bucket first if the bucket records size is greater than
* the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}.
*
* @param value HoodieRecord
*/
private void flushBufferOnCondition(I value) {
boolean needFlush = this.detector.detect(value);
private void bufferRecord(I value) {
final String bucketID = getBucketID(value);

DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)));
boolean needFlush = bucket.detector.detect(value);
if (needFlush) {
flushBuffer(false, false);
this.detector.reset();
flushBucket(bucket);
bucket.reset();
}
bucket.records.add((HoodieRecord<?>) value);
}

@SuppressWarnings("unchecked, rawtypes")
private void flushBucket(DataBucket bucket) {
this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, cancel.");
return;
}
List<HoodieRecord> records = bucket.records;
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
}
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, currentInstant));
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
.taskID(taskID)
.instantTime(currentInstant)
.writeStatus(writeStatus)
.isLastBatch(false)
.isEndInput(false)
.build();
this.eventGateway.sendEventToCoordinator(event);
}

@SuppressWarnings("unchecked, rawtypes")
private void flushBuffer(boolean isLastBatch, boolean isEndInput) {
private void flushRemaining(boolean isEndInput) {
this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, cancel.");
return;
}
final List<WriteStatus> writeStatus;
if (buffer.size() > 0) {
if (buckets.size() > 0) {
writeStatus = new ArrayList<>();
this.buffer.values()
this.buckets.values()
// The records are partitioned by the bucket ID and each batch sent to
// the writer belongs to one bucket.
.forEach(records -> {
.forEach(bucket -> {
List<HoodieRecord> records = bucket.records;
if (records.size() > 0) {
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
Expand All @@ -368,11 +416,11 @@ private void flushBuffer(boolean isLastBatch, boolean isEndInput) {
.taskID(taskID)
.instantTime(currentInstant)
.writeStatus(writeStatus)
.isLastBatch(isLastBatch)
.isLastBatch(true)
.isEndInput(isEndInput)
.build();
this.eventGateway.sendEventToCoordinator(event);
this.buffer.clear();
this.buckets.clear();
this.currentInstant = "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ public void testInsertWithMiniBatches() throws Exception {

// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
dataBuffer = funcWrapper.getDataBuffer();
assertThat("All data should be flushed out", dataBuffer.size(), is(0));

final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
Expand Down Expand Up @@ -430,10 +431,8 @@ public void testInsertWithMiniBatches() throws Exception {

final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event4 = funcWrapper.getNextEvent();
final OperatorEvent event5 = funcWrapper.getNextEvent();
funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
funcWrapper.getCoordinator().handleEventFromOperator(0, event5);
funcWrapper.checkpointComplete(2);

// Same the original base file content.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public OperatorEvent getNextEvent() {
}

public Map<String, List<HoodieRecord>> getDataBuffer() {
return this.writeFunction.getBuffer();
return this.writeFunction.getDataBuffer();
}

@SuppressWarnings("rawtypes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -151,15 +151,9 @@ public void cancel() {
}

private void loadDataBuffer() {
this.dataBuffer = new ArrayList<>();
try (BufferedReader reader =
new BufferedReader(new FileReader(this.path.toString()))) {
String line = reader.readLine();
while (line != null) {
this.dataBuffer.add(line);
// read next line
line = reader.readLine();
}
try {
new File(this.path.toString()).exists();
this.dataBuffer = Files.readAllLines(Paths.get(this.path.toUri()));
} catch (IOException e) {
throw new RuntimeException("Read file " + this.path + " error", e);
}
Expand Down