Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY;
import static org.apache.hudi.common.util.queue.ExecutorType.DISRUPTOR;
import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;

/**
Expand Down Expand Up @@ -138,6 +139,7 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final ConfigProperty<String> EXECUTOR_TYPE = ConfigProperty
.key("hoodie.write.executor.type")
.defaultValue(BOUNDED_IN_MEMORY.name())
.withValidValues(BOUNDED_IN_MEMORY.name(), DISRUPTOR.name())
.withDocumentation("Set executor which orchestrates concurrent producers and consumers communicating through a message queue."
+ "default value is BOUNDED_IN_MEMORY which use a bounded in-memory queue using LinkedBlockingQueue."
+ "Also users could use DISRUPTOR, which use disruptor as a lock free message queue "
Expand Down Expand Up @@ -1000,7 +1002,7 @@ public String getKeyGeneratorClass() {
}

public ExecutorType getExecutorType() {
return ExecutorType.valueOf(getString(EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
return ExecutorType.valueOf(getStringOrDefault(EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
}

public boolean isCDCEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@

package org.apache.hudi.table.action.commit;

import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
Expand All @@ -27,33 +34,26 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.util.QueueBasedExecutorFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -91,7 +91,7 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
readSchema = mergeHandle.getWriterSchemaWithMetaFields();
}

BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());

Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema());
Expand Down Expand Up @@ -137,13 +137,14 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood

ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), readerIterator,
new UpdateHandler(mergeHandle), record -> {

wrapper = QueueBasedExecutorFactory.create(table.getConfig(), readerIterator, new UpdateHandler(mergeHandle), record -> {
if (!externalSchemaTransformation) {
return record;
}
return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);
return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, record);
}, table.getPreExecuteRunnable());

wrapper.execute();
} catch (Exception e) {
throw new HoodieException(e);
Expand Down
5 changes: 4 additions & 1 deletion packaging/hudi-spark-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,15 @@
<include>org.jetbrains.kotlin:*</include>
<include>org.rocksdb:rocksdbjni</include>
<include>org.antlr:stringtemplate</include>
<include>org.apache.parquet:parquet-avro</include>

<include>com.lmax:disruptor</include>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we want to shade this lib? it's purely internal right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to shade it, it's unlikely other engines will be using it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me add an open question in bundle standards, where we need to put out some guideline for shading

<include>com.github.davidmoten:guava-mini</include>
<include>com.github.davidmoten:hilbert-curve</include>
<include>com.github.ben-manes.caffeine:caffeine</include>
<include>org.apache.parquet:parquet-avro</include>
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
<include>com.twitter:bijection-core_${scala.binary.version}</include>

<include>io.dropwizard.metrics:metrics-core</include>
<include>io.dropwizard.metrics:metrics-graphite</include>
<include>io.dropwizard.metrics:metrics-jmx</include>
Expand Down Expand Up @@ -127,6 +129,7 @@
<include>org.apache.hbase.thirdparty:hbase-shaded-netty</include>
<include>org.apache.hbase.thirdparty:hbase-shaded-protobuf</include>
<include>org.apache.htrace:htrace-core4</include>

<include>org.apache.curator:curator-framework</include>
<include>org.apache.curator:curator-client</include>
<include>org.apache.curator:curator-recipes</include>
Expand Down
1 change: 1 addition & 0 deletions packaging/hudi-utilities-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
<include>org.antlr:stringtemplate</include>
<include>org.apache.parquet:parquet-avro</include>

<include>com.lmax:disruptor</include>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

<include>com.github.davidmoten:guava-mini</include>
<include>com.github.davidmoten:hilbert-curve</include>
<include>com.github.ben-manes.caffeine:caffeine</include>
Expand Down