diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index b9d7c800250a0..f7e8667a83270 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -272,8 +272,8 @@ public class HoodieWriteConfig extends HoodieConfig { .defaultValue(String.valueOf(4 * 1024 * 1024)) .withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes."); - public static final ConfigProperty WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE = ConfigProperty - .key("hoodie.write.executor.disruptor.buffer.size") + public static final ConfigProperty WRITE_EXECUTOR_DISRUPTOR_BUFFER_LIMIT_BYTES = ConfigProperty + .key("hoodie.write.executor.disruptor.buffer.limit.bytes") .defaultValue(String.valueOf(1024)) .sinceVersion("0.13.0") .withDocumentation("The size of the Disruptor Executor ring buffer, must be power of 2"); @@ -1180,8 +1180,8 @@ public String getWriteExecutorDisruptorWaitStrategy() { return getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY); } - public Integer getWriteExecutorDisruptorWriteBufferSize() { - return Integer.parseInt(getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE)); + public Integer getWriteExecutorDisruptorWriteBufferLimitBytes() { + return Integer.parseInt(getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_BUFFER_LIMIT_BYTES)); } public boolean shouldCombineBeforeInsert() { @@ -2542,8 +2542,8 @@ public Builder withWriteExecutorDisruptorWaitStrategy(String waitStrategy) { return this; } - public Builder withWriteExecutorDisruptorWriteBufferSize(long size) { - writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE, String.valueOf(size)); + public Builder withWriteExecutorDisruptorWriteBufferLimitBytes(long size) { + writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_BUFFER_LIMIT_BYTES, String.valueOf(size)); return this; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java index bd192f649dbe8..7baada740899d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java @@ -52,7 +52,7 @@ public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer, transformFunction, preExecuteRunnable); case DISRUPTOR: - return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferSize(), inputItr, consumer, + return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), inputItr, consumer, transformFunction, hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable); case SIMPLE: return new SimpleExecutor<>(inputItr, consumer, transformFunction); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java index 55c2325b137a6..19155f6b31877 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java @@ -50,7 +50,7 @@ public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness { private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() .withExecutorType(ExecutorType.DISRUPTOR.name()) - .withWriteExecutorDisruptorWriteBufferSize(8) + .withWriteExecutorDisruptorWriteBufferLimitBytes(8) .build(false); @BeforeEach @@ -94,7 +94,7 @@ public Integer finish() { DisruptorExecutor exec = null; try { - exec = new DisruptorExecutor<>(writeConfig.getWriteExecutorDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, + exec = new DisruptorExecutor<>(writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable()); int result = exec.execute(); // It should buffer and write 100 records @@ -127,7 +127,9 @@ public void testInterruptExecutor() { @Override public void consume(HoodieRecord record) { try { - Thread.currentThread().wait(); + synchronized (this) { + wait(); + } } catch (InterruptedException ie) { // ignore here } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java index 7344ccd89fd16..4c8e0dac27dbc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java @@ -68,7 +68,7 @@ public class TestDisruptorMessageQueue extends HoodieClientTestHarness { private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() .withExecutorType(ExecutorType.DISRUPTOR.name()) - .withWriteExecutorDisruptorWriteBufferSize(16) + .withWriteExecutorDisruptorWriteBufferLimitBytes(16) .build(false); @BeforeEach @@ -139,7 +139,7 @@ public Integer finish() { DisruptorExecutor>, Integer> exec = null; try { - exec = new DisruptorExecutor(writeConfig.getWriteExecutorDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, + exec = new DisruptorExecutor(writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable()); int result = exec.execute(); // It should buffer and write 100 records diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 9ed04dae626ba..176b40c612a45 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -156,7 +156,7 @@ object DataSourceReadOptions { val FILE_INDEX_LISTING_MODE_LAZY = "lazy" val FILE_INDEX_LISTING_MODE_OVERRIDE: ConfigProperty[String] = - ConfigProperty.key("hoodie.datasource.read.file.index.listing.mode.override") + ConfigProperty.key("hoodie.datasource.read.file.index.listing.mode") .defaultValue(FILE_INDEX_LISTING_MODE_LAZY) .withValidValues(FILE_INDEX_LISTING_MODE_LAZY, FILE_INDEX_LISTING_MODE_EAGER) .sinceVersion("0.13.0") @@ -463,17 +463,6 @@ object DataSourceWriteOptions { val RECONCILE_SCHEMA: ConfigProperty[Boolean] = HoodieCommonConfig.RECONCILE_SCHEMA - // NOTE: This is an internal config that is not exposed to the public - private[hudi] val CANONICALIZE_SCHEMA: ConfigProperty[Boolean] = - ConfigProperty.key("hoodie.datasource.write.schema.canonicalize") - .defaultValue(true) - .sinceVersion("0.13.0") - .withDocumentation("Controls whether incoming batch's schema's nullability constraints should be canonicalized " - + "relative to the table's schema. For ex, in case field A is marked as null-able in table's schema, but is marked " - + "as non-null in the incoming batch, w/o canonicalization such write might fail as we won't be able to read existing " - + "null records from the table (for updating, for ex). Note, that this config has only effect when " - + "'hoodie.datasource.write.reconcile.schema' is set to false.") - // HIVE SYNC SPECIFIC CONFIGS // NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes // unexpected issues with config getting reset diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 5dda7c9df78c3..7e234775faa28 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -29,7 +29,7 @@ import org.apache.hudi.avro.AvroSchemaUtils.{isCompatibleProjectionOf, isSchemaC import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} -import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieConfig, HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig, HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.engine.HoodieEngineContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType @@ -72,6 +72,19 @@ import scala.collection.mutable.ListBuffer object HoodieSparkSqlWriter { + /** + * Controls whether incoming batch's schema's nullability constraints should be canonicalized + * relative to the table's schema. For ex, in case field A is marked as null-able in table's schema, but is marked + * as non-null in the incoming batch, w/o canonicalization such write might fail as we won't be able to read existing + * null records from the table (for updating, for ex). Note, that this config has only effect when + * 'hoodie.datasource.write.reconcile.schema' is set to false + * + * NOTE: This is an internal config that is not exposed to the public + */ + val CANONICALIZE_NULLABLE: ConfigProperty[Boolean] = + ConfigProperty.key("hoodie.internal.write.schema.canonicalize.nullable") + .defaultValue(true) + private val log = LogManager.getLogger(getClass) private var tableExists: Boolean = false private var asyncCompactionTriggerFnDefined: Boolean = false @@ -397,9 +410,9 @@ object HoodieSparkSqlWriter { // relative to the table's one, by doing a (minor) reconciliation of the nullability constraints: // for ex, if in incoming schema column A is designated as non-null, but it's designated as nullable // in the table's one we want to proceed aligning nullability constraints w/ the table's schema - val shouldCanonicalizeSchema = opts.getOrDefault(DataSourceWriteOptions.CANONICALIZE_SCHEMA.key, - DataSourceWriteOptions.CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean - val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) { + val shouldCanonicalizeNullable = opts.getOrDefault(CANONICALIZE_NULLABLE.key, + CANONICALIZE_NULLABLE.defaultValue.toString).toBoolean + val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) { AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, latestTableSchema) } else { sourceSchema diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 9099c7225e0f9..418b2f8d6ecce 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command import org.apache.avro.Schema import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.HoodieSparkSqlWriter.CANONICALIZE_NULLABLE import org.apache.hudi.common.model.HoodieAvroRecordMerger import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.HoodieWriteConfig @@ -532,7 +533,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie // target table, ie partially updating) AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false", RECONCILE_SCHEMA.key -> "false", - "hoodie.datasource.write.schema.canonicalize" -> "false" + CANONICALIZE_NULLABLE.key -> "false" ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala index 36540b43a40a9..06239697db901 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala @@ -87,7 +87,7 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal // instead of serving already cached value spark.sessionState.catalog.invalidateAllCachedTables() - spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode.override=$listingModeOverride") + spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode=$listingModeOverride") val df = spark.sql(s"SELECT * FROM $tableName WHERE partition = '2021-01-05'") val optimizedPlan = df.queryExecution.optimizedPlan @@ -179,7 +179,7 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal // instead of serving already cached value spark.sessionState.catalog.invalidateAllCachedTables() - spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode.override=$listingModeOverride") + spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode=$listingModeOverride") val df = spark.sql(s"SELECT * FROM $tableName") val optimizedPlan = df.queryExecution.optimizedPlan