Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
767ddd8
Fixed incorrect use of `CountDownLatch`
Nov 18, 2022
4ebf875
Removed unnecessary casting
Nov 18, 2022
77d7e9d
Adding logs
Nov 18, 2022
a4672e0
Unified `getResult` and `finish` in one API
Nov 18, 2022
d81602a
Simplifying `DisruptorExecutor`
Nov 18, 2022
14f2956
`BoundedInMemoryQueueIterable` > `BoundedInMemoryQueue`
Nov 18, 2022
326ed2c
Lifted all queue lifecycle management to `HoodieExecutorBase`
Nov 18, 2022
a267452
Tidying up
Nov 18, 2022
aee1b84
Rebased direct users of the BIMQ to rely on `QueueBasedExecutorFactor…
Nov 18, 2022
280ff3a
Tidying up
Nov 18, 2022
034c123
Close the queue after processing is done
Nov 18, 2022
ef42ebf
Cleaning up superfluous APIs
Nov 18, 2022
b286f0f
Adding missing deps to bundles
Nov 18, 2022
09ca087
Tidying up
Nov 18, 2022
d3726e4
Moved actual queue iteration into `BoundedInMemoryQueueExecutor`
Nov 18, 2022
9f9aad6
Cleaned up `IteratorBasedQueueConsumer`
Nov 18, 2022
25218aa
Revisited `BoundedInMemoryExecutor` bootstrapping w/o consumer;
Nov 18, 2022
ec5b19c
Streamline and unify producing handling w/in Queue-based executors
Nov 18, 2022
9028449
Abstracted common producing seq
Nov 18, 2022
30c4d85
Delineated sealing/closing of the queue (addressing the issue of shut…
Nov 18, 2022
4214901
Tidying up
Nov 18, 2022
7779c5f
Lifted up all concurrency management to `BaseHoodieQueueBasedExuecutor`;
Nov 18, 2022
c2b9d85
Decouple consuming seq from returning the result, to address the issu…
Nov 18, 2022
4310334
Relocated Disruptor queue setup under `doConsume`
Nov 18, 2022
a3c2c11
Tidying up
Nov 18, 2022
8c2a34a
Replace `join` w/ `get` to be able to handle `InterruptedException`
Nov 30, 2022
ccf65c1
Simplify the test
Nov 30, 2022
d05236b
Tidying up
Nov 30, 2022
e4652c0
Fixed tests
Nov 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenResult;
import org.apache.hudi.io.HoodieWriteHandle;
Expand All @@ -34,25 +34,27 @@
import java.util.List;
import java.util.Map;

import static org.apache.hudi.common.util.ValidationUtils.checkState;

/**
* Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles.
*/
public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload>
extends IteratorBasedQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {
implements HoodieConsumer<HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {

private HoodieWriteConfig config;
private String instantTime;
private boolean areRecordsSorted;
private HoodieTable hoodieTable;
private String idPrefix;
private TaskContextSupplier taskContextSupplier;
private WriteHandleFactory writeHandleFactory;
private final HoodieWriteConfig config;
private final String instantTime;
private final boolean areRecordsSorted;
private final HoodieTable hoodieTable;
private final String idPrefix;
private final TaskContextSupplier taskContextSupplier;
private final WriteHandleFactory writeHandleFactory;

private final List<WriteStatus> statuses = new ArrayList<>();
// Stores the open HoodieWriteHandle for each table partition path
// If the records are consumed in order, there should be only one open handle in this mapping.
// Otherwise, there may be multiple handles.
private Map<String, HoodieWriteHandle> handles = new HashMap<>();
private final Map<String, HoodieWriteHandle> handles = new HashMap<>();

public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime,
boolean areRecordsSorted, HoodieTable hoodieTable, String idPrefix,
Expand All @@ -68,7 +70,7 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime,
}

@Override
public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
public void consume(HoodieInsertValueGenResult<HoodieRecord> payload) {
final HoodieRecord insertPayload = payload.record;
String partitionPath = insertPayload.getPartitionPath();
HoodieWriteHandle<?,?,?,?> handle = handles.get(partitionPath);
Expand Down Expand Up @@ -97,13 +99,9 @@ public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
}

@Override
public void finish() {
public List<WriteStatus> finish() {
closeOpenHandles();
assert statuses.size() > 0;
}

@Override
public List<WriteStatus> getResult() {
checkState(statuses.size() > 0);
return statuses;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord
return writeRecord(hoodieRecord, indexedRecord, false);
}

protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete) {
private boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete) {
Option recordMetadata = hoodieRecord.getData().getMetadata();
if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.HoodieBootstrapHandle;

Expand All @@ -29,7 +29,7 @@
/**
* Consumer that dequeues records from queue and sends to Merge Handle for writing.
*/
public class BootstrapRecordConsumer extends IteratorBasedQueueConsumer<HoodieRecord, Void> {
public class BootstrapRecordConsumer implements HoodieConsumer<HoodieRecord, Void> {

private final HoodieBootstrapHandle bootstrapHandle;

Expand All @@ -38,7 +38,7 @@ public BootstrapRecordConsumer(HoodieBootstrapHandle bootstrapHandle) {
}

@Override
public void consumeOneRecord(HoodieRecord record) {
public void consume(HoodieRecord record) {
try {
bootstrapHandle.write(record, ((HoodieRecordPayload) record.getData())
.getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields()));
Expand All @@ -48,10 +48,7 @@ public void consumeOneRecord(HoodieRecord record) {
}

@Override
public void finish() {}

@Override
protected Void getResult() {
public Void finish() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
Expand Down Expand Up @@ -65,7 +65,7 @@ protected Iterator<GenericRecord> getMergingIterator(HoodieTable<?, ?, ?, ?> tab
/**
* Consumer that dequeues records from queue and sends to Merge Handle.
*/
protected static class UpdateHandler extends IteratorBasedQueueConsumer<GenericRecord, Void> {
protected static class UpdateHandler implements HoodieConsumer<GenericRecord, Void> {

private final HoodieMergeHandle upsertHandle;

Expand All @@ -74,15 +74,12 @@ protected UpdateHandler(HoodieMergeHandle upsertHandle) {
}

@Override
public void consumeOneRecord(GenericRecord record) {
public void consume(GenericRecord record) {
upsertHandle.write(record);
}

@Override
public void finish() {}

@Override
protected Void getResult() {
public Void finish() {
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

package org.apache.hudi.util;

import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.DisruptorExecutor;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;

Expand All @@ -31,11 +32,18 @@

public class QueueBasedExecutorFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can also rename QueueBasedExecutorFactory to ExecutorFactory because we will support simpleExecutor here which has no inner Queue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Let's take this up in the PR introducing SimpleExecutor


/**
* Create a new hoodie executor instance on demand.
*/
public static <I, O, E> HoodieExecutor create(HoodieWriteConfig hoodieConfig, Iterator<I> inputItr, IteratorBasedQueueConsumer<O, E> consumer,
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
public static <I, O, E> HoodieExecutor<I, O, E> create(HoodieWriteConfig hoodieConfig,
Iterator<I> inputItr,
HoodieConsumer<O, E> consumer,
Function<I, O> transformFunction) {
return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop());
}

public static <I, O, E> HoodieExecutor<I, O, E> create(HoodieWriteConfig hoodieConfig,
Iterator<I> inputItr,
HoodieConsumer<O, E> consumer,
Function<I, O> transformFunction,
Runnable preExecuteRunnable) {
ExecutorType executorType = hoodieConfig.getExecutorType();

switch (executorType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.io.HoodieWriteHandle;

import java.util.ArrayList;
import java.util.List;

import static org.apache.hudi.common.util.ValidationUtils.checkState;

/**
* Consumes stream of hoodie records from in-memory queue and writes to one explicit create handle.
*/
public class ExplicitWriteHandler<T extends HoodieRecordPayload>
extends IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {
implements HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {

private final List<WriteStatus> statuses = new ArrayList<>();

Expand All @@ -42,19 +44,15 @@ public ExplicitWriteHandler(HoodieWriteHandle handle) {
}

@Override
public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> payload) {
public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> payload) {
final HoodieRecord insertPayload = payload.record;
handle.write(insertPayload, payload.insertValue, payload.exception);
}

@Override
public void finish() {
public List<WriteStatus> finish() {
closeOpenHandle();
assert statuses.size() > 0;
}

@Override
public List<WriteStatus> getResult() {
checkState(statuses.size() > 0);
return statuses;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,24 @@

package org.apache.hudi.execution;

import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.ExplicitWriteHandleFactory;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.hudi.util.QueueBasedExecutorFactory;

import java.util.Iterator;
import java.util.List;

import static org.apache.hudi.common.util.ValidationUtils.checkState;

/**
* Flink lazy iterable that supports explicit write handler.
*
Expand All @@ -57,14 +57,13 @@ public FlinkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
@Override
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
null;
HoodieExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor = null;
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr),
Option.of(getExplicitInsertHandler()), getTransformFunction(schema, hoodieConfig));
bufferedIteratorExecutor = QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(),
getTransformFunction(schema, hoodieConfig));
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
checkState(result != null && !result.isEmpty());
return result;
} catch (Exception e) {
throw new HoodieException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@

package org.apache.hudi.execution;

import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.hudi.util.QueueBasedExecutorFactory;

import java.util.Iterator;
import java.util.List;

import static org.apache.hudi.common.util.ValidationUtils.checkState;

public class JavaLazyInsertIterable<T extends HoodieRecordPayload> extends HoodieLazyInsertIterable<T> {
public JavaLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
boolean areRecordsSorted,
Expand All @@ -60,14 +60,14 @@ public JavaLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
@Override
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
HoodieExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
null;
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor =
new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema));
QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getTransformFunction(schema));
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
checkState(result != null && !result.isEmpty());
return result;
} catch (Exception e) {
throw new HoodieException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.Iterator;
import java.util.List;

import static org.apache.hudi.common.util.ValidationUtils.checkState;

public class SparkLazyInsertIterable<T extends HoodieRecordPayload> extends HoodieLazyInsertIterable<T> {

private boolean useWriterSchema;
Expand Down Expand Up @@ -89,7 +91,7 @@ protected List<WriteStatus> computeNext() {
getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());

final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
checkState(result != null && !result.isEmpty());
return result;
} catch (Exception e) {
throw new HoodieException(e);
Expand Down
Loading