Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,11 @@ public class HoodieWriteConfig extends HoodieConfig {
.defaultValue(BOUNDED_IN_MEMORY.name())
.withValidValues(BOUNDED_IN_MEMORY.name(), DISRUPTOR.name())
.withDocumentation("Set executor which orchestrates concurrent producers and consumers communicating through a message queue."
+ "default value is BOUNDED_IN_MEMORY which use a bounded in-memory queue using LinkedBlockingQueue."
+ "Also users could use DISRUPTOR, which use disruptor as a lock free message queue "
+ "to gain better writing performance if lock was the bottleneck. Although DISRUPTOR_EXECUTOR is still an experimental feature.");
+ "BOUNDED_IN_MEMORY(default): Use LinkedBlockingQueue as a bounded in-memory queue, this queue will use extra lock to balance producers and consumer"
+ "DISRUPTOR: Use disruptor which a lock free message queue as inner message, this queue may gain better writing performance if lock was the bottleneck. "
+ "SIMPLE: Executor with no inner message queue and no inner lock. Consuming and writing records from iterator directly. Compared with BIM and DISRUPTOR, "
+ "this queue has no need for additional memory and cpu resources due to lock or multithreading, but also lost some benefits such as speed limit. "
+ "Although DISRUPTOR_EXECUTOR and SIMPLE are still in experimental.");

public static final ConfigProperty<String> KEYGENERATOR_TYPE = ConfigProperty
.key("hoodie.datasource.write.keygenerator.type")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.QueueBasedExecutorFactory;
import org.apache.hudi.util.ExecutorFactory;

import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
Expand Down Expand Up @@ -108,7 +108,7 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
|| !isPureProjection
|| baseFile.getBootstrapBaseFile().isPresent();

HoodieExecutor<HoodieRecord, HoodieRecord, Void> wrapper = null;
HoodieExecutor<Void> wrapper = null;

try {
Iterator<HoodieRecord> recordIterator;
Expand All @@ -135,7 +135,7 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
recordSchema = isPureProjection ? writerSchema : readerSchema;
}

wrapper = QueueBasedExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> {
wrapper = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> {
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into queue of QueueBasedExecutorFactory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.SimpleHoodieExecutor;
import org.apache.hudi.common.util.queue.DisruptorExecutor;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.HoodieExecutor;
Expand All @@ -30,16 +31,16 @@
import java.util.Iterator;
import java.util.function.Function;

public class QueueBasedExecutorFactory {
public class ExecutorFactory {

public static <I, O, E> HoodieExecutor<I, O, E> create(HoodieWriteConfig hoodieConfig,
public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
Iterator<I> inputItr,
HoodieConsumer<O, E> consumer,
Function<I, O> transformFunction) {
return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop());
}

public static <I, O, E> HoodieExecutor<I, O, E> create(HoodieWriteConfig hoodieConfig,
public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
Iterator<I> inputItr,
HoodieConsumer<O, E> consumer,
Function<I, O> transformFunction,
Expand All @@ -53,6 +54,8 @@ public static <I, O, E> HoodieExecutor<I, O, E> create(HoodieWriteConfig hoodieC
case DISRUPTOR:
return new DisruptorExecutor<>(hoodieConfig.getDisruptorWriteBufferSize(), inputItr, consumer,
transformFunction, hoodieConfig.getWriteExecutorWaitStrategy(), preExecuteRunnable);
case SIMPLE:
return new SimpleHoodieExecutor<>(inputItr, consumer, transformFunction);
default:
throw new HoodieException("Unsupported Executor Type " + executorType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.hudi.io.ExplicitWriteHandleFactory;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.QueueBasedExecutorFactory;
import org.apache.hudi.util.ExecutorFactory;

import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -56,10 +56,11 @@ public FlinkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
@Override
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
HoodieExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor = null;
HoodieExecutor<List<WriteStatus>> bufferedIteratorExecutor = null;
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor = QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(), getCloningTransformer(schema, hoodieConfig));
bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(),
getCloningTransformer(schema, hoodieConfig));
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
checkState(result != null && !result.isEmpty());
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.QueueBasedExecutorFactory;
import org.apache.hudi.util.ExecutorFactory;

import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -59,12 +59,12 @@ public JavaLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
@Override
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
HoodieExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
HoodieExecutor<List<WriteStatus>> bufferedIteratorExecutor =
null;
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor =
QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getCloningTransformer(schema));
ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getCloningTransformer(schema));
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
checkState(result != null && !result.isEmpty());
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.hudi.util.QueueBasedExecutorFactory;
import org.apache.hudi.util.ExecutorFactory;

import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -79,14 +79,14 @@ public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
@Override
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
HoodieExecutor<?, ?, List<WriteStatus>> bufferedIteratorExecutor = null;
HoodieExecutor<List<WriteStatus>> bufferedIteratorExecutor = null;
try {
Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
if (useWriterSchema) {
schema = HoodieAvroUtils.addMetadataFields(schema);
}

bufferedIteratorExecutor = QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(),
bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(),
getCloningTransformer(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());

final List<WriteStatus> result = bufferedIteratorExecutor.execute();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.execution;

import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer;

import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.common.util.queue.SimpleHoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import scala.Tuple2;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestSimpleExecutionInSpark extends HoodieClientTestHarness {
Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Dec 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @@alexeykudinkin
Not involved testInterruptExecutor in this SimpleExecutionSpark Test.
Bcz there is no inner Thread pool in this new SimpleExecutor so there is nothing to interrupt right?

Is that looks good to u ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense


private final String instantTime = HoodieActiveTimeline.createNewInstantTime();

@BeforeEach
public void setUp() throws Exception {
initTestDataGenerator();
initExecutorServiceWithFixedThreadPool(2);
}

@AfterEach
public void tearDown() throws Exception {
cleanupResources();
}

@Test
public void testExecutor() {

final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 128);
final List<HoodieRecord> consumedRecords = new ArrayList<>();

HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(8));
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
private int count = 0;

@Override
public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) throws Exception {
consumedRecords.add(record.getResult());
count++;
}

@Override
public Integer finish() {
return count;
}
};
SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;

try {
exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));

int result = exec.execute();
// It should buffer and write 128 records
assertEquals(128, result);

// collect all records and assert that consumed records are identical to produced ones
// assert there's no tampering, and that the ordering is preserved
assertEquals(hoodieRecords, consumedRecords);

} finally {
if (exec != null) {
exec.shutdownNow();
}
}
}

// Test to ensure that we are reading all records from queue iterator in the same order
// without any exceptions.
@SuppressWarnings("unchecked")
@Test
@Timeout(value = 60)
public void testRecordReading() {

final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>();
ArrayList<HoodieAvroRecord> afterRecord = new ArrayList<>();
ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>();

hoodieRecords.forEach(record -> {
final HoodieAvroRecord originalRecord = (HoodieAvroRecord) record;
beforeRecord.add(originalRecord);
try {
final Option<IndexedRecord> originalInsertValue =
originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
beforeIndexedRecord.add(originalInsertValue.get());
} catch (IOException e) {
// ignore exception here.
}
});

HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
private int count = 0;

@Override
public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) throws Exception {
count++;
afterRecord.add((HoodieAvroRecord) record.getResult());
try {
IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) record.getResult())
.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
afterIndexedRecord.add(indexedRecord);
} catch (IOException e) {
//ignore exception here.
}
}

@Override
public Integer finish() {
return count;
}
};

SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;

try {
exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
int result = exec.execute();
assertEquals(100, result);

assertEquals(beforeRecord, afterRecord);
assertEquals(beforeIndexedRecord, afterIndexedRecord);

} finally {
if (exec != null) {
exec.shutdownNow();
}
}
}

/**
* Test to ensure exception happend in iterator then we need to stop the simple ingestion.
*/
@SuppressWarnings("unchecked")
@Test
@Timeout(value = 60)
public void testException() {
final int numRecords = 1000;
final String errorMessage = "Exception when iterating records!!!";

List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime, numRecords);
InnerIterator iterator = new InnerIterator(pRecs.iterator(), errorMessage, numRecords / 10);

HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
int count = 0;

@Override
public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> payload) throws Exception {
// Read recs and ensure we have covered all producer recs.
final HoodieRecord rec = payload.getResult();
count++;
}

@Override
public Integer finish() {
return count;
}
};

SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec =
new SimpleHoodieExecutor(iterator, consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));

final Throwable thrown = assertThrows(HoodieException.class, exec::execute,
"exception is expected");
assertTrue(thrown.getMessage().contains(errorMessage));
}

class InnerIterator implements Iterator<HoodieRecord> {

private Iterator<HoodieRecord> iterator;
private AtomicInteger count = new AtomicInteger(0);
private String errorMessage;
private int errorMessageCount;

public InnerIterator(Iterator<HoodieRecord> iterator, String errorMessage, int errorMessageCount) {
this.iterator = iterator;
this.errorMessage = errorMessage;
this.errorMessageCount = errorMessageCount;
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public HoodieRecord next() {
if (count.get() == errorMessageCount) {
throw new HoodieException(errorMessage);
}

count.incrementAndGet();
return iterator.next();
}
}
}
Loading