Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
Expand Down Expand Up @@ -248,16 +249,19 @@ private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadIns
int instantsInPreviousFile = instantsInRange.size();
// Read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
// TODO If we can store additional metadata in datablock, we can skip parsing records
// (such as startTime, endTime of records in the block)
try (ClosableIterator<IndexedRecord> itr = blk.getRecordItr()) {
StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true)
// Filter blocks in desired time window
.filter(r -> commitsFilter.apply((GenericRecord) r))
.map(r -> readCommit((GenericRecord) r, loadInstantDetails))
.filter(c -> filter == null || filter.isInRange(c))
.forEach(instantsInRange::add);
HoodieLogBlock block = reader.next();
if (block instanceof HoodieAvroDataBlock) {
HoodieAvroDataBlock avroBlock = (HoodieAvroDataBlock) block;
// TODO If we can store additional metadata in datablock, we can skip parsing records
// (such as startTime, endTime of records in the block)
try (ClosableIterator<IndexedRecord> itr = avroBlock.getRecordItr()) {
StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true)
// Filter blocks in desired time window
.filter(r -> commitsFilter.apply((GenericRecord) r))
.map(r -> readCommit((GenericRecord) r, loadInstantDetails))
.filter(c -> filter == null || filter.isInRange(c))
.forEach(instantsInRange::add);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.utils.HiveSyncContext;
Expand All @@ -43,7 +42,6 @@
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -431,31 +429,6 @@ private void handleWriteMetaEvent(WriteMetadataEvent event) {
addEventToBuffer(event);
}

/**
* The coordinator reuses the instant if there is no data for this round of checkpoint,
* sends the commit ack events to unblock the flushing.
*/
private void sendCommitAckEvents(long checkpointId) {
CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull)
.map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
.toArray(CompletableFuture<?>[]::new);
CompletableFuture.allOf(futures).whenComplete((resp, error) -> {
if (!sendToFinishedTasks(error)) {
throw new HoodieException("Error while waiting for the commit ack events to finish sending", error);
}
});
}

/**
* Decides whether the given exception is caused by sending events to FINISHED tasks.
*
* <p>Ugly impl: the exception may change in the future.
*/
private static boolean sendToFinishedTasks(Throwable throwable) {
return throwable.getCause() instanceof TaskNotRunningException
|| throwable.getCause().getMessage().contains("running");
}

/**
* Commits the instant.
*/
Expand Down Expand Up @@ -483,8 +456,6 @@ private boolean commitInstant(String instant, long checkpointId) {
if (writeResults.size() == 0) {
// No data has written, reset the buffer and returns early
reset();
// Send commit ack event to the write function to unblock the flushing
sendCommitAckEvents(checkpointId);
return false;
}
doCommit(instant, writeResults);
Expand Down