Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ public class HoodieWriteConfig extends HoodieConfig {
.defaultValue(String.valueOf(4 * 1024 * 1024))
.withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes.");

public static final ConfigProperty<String> WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE = ConfigProperty
.key("hoodie.write.executor.disruptor.buffer.size")
public static final ConfigProperty<String> WRITE_EXECUTOR_DISRUPTOR_BUFFER_LIMIT_BYTES = ConfigProperty
.key("hoodie.write.executor.disruptor.buffer.limit.bytes")
.defaultValue(String.valueOf(1024))
.sinceVersion("0.13.0")
.withDocumentation("The size of the Disruptor Executor ring buffer, must be power of 2");
Expand Down Expand Up @@ -1180,8 +1180,8 @@ public String getWriteExecutorDisruptorWaitStrategy() {
return getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY);
}

public Integer getWriteExecutorDisruptorWriteBufferSize() {
return Integer.parseInt(getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE));
public Integer getWriteExecutorDisruptorWriteBufferLimitBytes() {
return Integer.parseInt(getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_BUFFER_LIMIT_BYTES));
}

public boolean shouldCombineBeforeInsert() {
Expand Down Expand Up @@ -2542,8 +2542,8 @@ public Builder withWriteExecutorDisruptorWaitStrategy(String waitStrategy) {
return this;
}

public Builder withWriteExecutorDisruptorWriteBufferSize(long size) {
writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE, String.valueOf(size));
public Builder withWriteExecutorDisruptorWriteBufferLimitBytes(long size) {
writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_BUFFER_LIMIT_BYTES, String.valueOf(size));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer,
transformFunction, preExecuteRunnable);
case DISRUPTOR:
return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferSize(), inputItr, consumer,
return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), inputItr, consumer,
transformFunction, hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
case SIMPLE:
return new SimpleExecutor<>(inputItr, consumer, transformFunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {

private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withExecutorType(ExecutorType.DISRUPTOR.name())
.withWriteExecutorDisruptorWriteBufferSize(8)
.withWriteExecutorDisruptorWriteBufferLimitBytes(8)
.build(false);

@BeforeEach
Expand Down Expand Up @@ -94,7 +94,7 @@ public Integer finish() {
DisruptorExecutor<HoodieRecord, HoodieRecord, Integer> exec = null;

try {
exec = new DisruptorExecutor<>(writeConfig.getWriteExecutorDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer,
exec = new DisruptorExecutor<>(writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
int result = exec.execute();
// It should buffer and write 100 records
Expand Down Expand Up @@ -127,7 +127,9 @@ public void testInterruptExecutor() {
@Override
public void consume(HoodieRecord record) {
try {
Thread.currentThread().wait();
synchronized (this) {
wait();
}
} catch (InterruptedException ie) {
// ignore here
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class TestDisruptorMessageQueue extends HoodieClientTestHarness {

private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withExecutorType(ExecutorType.DISRUPTOR.name())
.withWriteExecutorDisruptorWriteBufferSize(16)
.withWriteExecutorDisruptorWriteBufferLimitBytes(16)
.build(false);

@BeforeEach
Expand Down Expand Up @@ -139,7 +139,7 @@ public Integer finish() {
DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;

try {
exec = new DisruptorExecutor(writeConfig.getWriteExecutorDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer,
exec = new DisruptorExecutor(writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
int result = exec.execute();
// It should buffer and write 100 records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ object DataSourceReadOptions {
val FILE_INDEX_LISTING_MODE_LAZY = "lazy"

val FILE_INDEX_LISTING_MODE_OVERRIDE: ConfigProperty[String] =
ConfigProperty.key("hoodie.datasource.read.file.index.listing.mode.override")
ConfigProperty.key("hoodie.datasource.read.file.index.listing.mode")
.defaultValue(FILE_INDEX_LISTING_MODE_LAZY)
.withValidValues(FILE_INDEX_LISTING_MODE_LAZY, FILE_INDEX_LISTING_MODE_EAGER)
.sinceVersion("0.13.0")
Expand Down Expand Up @@ -463,17 +463,6 @@ object DataSourceWriteOptions {

val RECONCILE_SCHEMA: ConfigProperty[Boolean] = HoodieCommonConfig.RECONCILE_SCHEMA

// NOTE: This is an internal config that is not exposed to the public
private[hudi] val CANONICALIZE_SCHEMA: ConfigProperty[Boolean] =
ConfigProperty.key("hoodie.datasource.write.schema.canonicalize")
.defaultValue(true)
.sinceVersion("0.13.0")
.withDocumentation("Controls whether incoming batch's schema's nullability constraints should be canonicalized "
+ "relative to the table's schema. For ex, in case field A is marked as null-able in table's schema, but is marked "
+ "as non-null in the incoming batch, w/o canonicalization such write might fail as we won't be able to read existing "
+ "null records from the table (for updating, for ex). Note, that this config has only effect when "
+ "'hoodie.datasource.write.reconcile.schema' is set to false.")

// HIVE SYNC SPECIFIC CONFIGS
// NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
// unexpected issues with config getting reset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.hudi.avro.AvroSchemaUtils.{isCompatibleProjectionOf, isSchemaC
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.engine.HoodieEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
Expand Down Expand Up @@ -72,6 +72,19 @@ import scala.collection.mutable.ListBuffer

object HoodieSparkSqlWriter {

/**
* Controls whether incoming batch's schema's nullability constraints should be canonicalized
* relative to the table's schema. For ex, in case field A is marked as null-able in table's schema, but is marked
* as non-null in the incoming batch, w/o canonicalization such write might fail as we won't be able to read existing
* null records from the table (for updating, for ex). Note, that this config has only effect when
* 'hoodie.datasource.write.reconcile.schema' is set to false
*
* NOTE: This is an internal config that is not exposed to the public
*/
val CANONICALIZE_NULLABLE: ConfigProperty[Boolean] =
ConfigProperty.key("hoodie.internal.write.schema.canonicalize.nullable")
.defaultValue(true)

private val log = LogManager.getLogger(getClass)
private var tableExists: Boolean = false
private var asyncCompactionTriggerFnDefined: Boolean = false
Expand Down Expand Up @@ -397,9 +410,9 @@ object HoodieSparkSqlWriter {
// relative to the table's one, by doing a (minor) reconciliation of the nullability constraints:
// for ex, if in incoming schema column A is designated as non-null, but it's designated as nullable
// in the table's one we want to proceed aligning nullability constraints w/ the table's schema
val shouldCanonicalizeSchema = opts.getOrDefault(DataSourceWriteOptions.CANONICALIZE_SCHEMA.key,
DataSourceWriteOptions.CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
val shouldCanonicalizeNullable = opts.getOrDefault(CANONICALIZE_NULLABLE.key,
CANONICALIZE_NULLABLE.defaultValue.toString).toBoolean
val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) {
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, latestTableSchema)
} else {
sourceSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command
import org.apache.avro.Schema
import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkSqlWriter.CANONICALIZE_NULLABLE
import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
Expand Down Expand Up @@ -532,7 +533,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
// target table, ie partially updating)
AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
RECONCILE_SCHEMA.key -> "false",
"hoodie.datasource.write.schema.canonicalize" -> "false"
CANONICALIZE_NULLABLE.key -> "false"
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
// instead of serving already cached value
spark.sessionState.catalog.invalidateAllCachedTables()

spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode.override=$listingModeOverride")
spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode=$listingModeOverride")

val df = spark.sql(s"SELECT * FROM $tableName WHERE partition = '2021-01-05'")
val optimizedPlan = df.queryExecution.optimizedPlan
Expand Down Expand Up @@ -179,7 +179,7 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
// instead of serving already cached value
spark.sessionState.catalog.invalidateAllCachedTables()

spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode.override=$listingModeOverride")
spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode=$listingModeOverride")

val df = spark.sql(s"SELECT * FROM $tableName")
val optimizedPlan = df.queryExecution.optimizedPlan
Expand Down