diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 4d07097c07c8..a82432bf466b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1065,6 +1065,10 @@ public boolean isHiveStylePartitioningEnabled() { return getBooleanOrDefault(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE); } + public boolean shouldURLEncodePartitionPath() { + return getBooleanOrDefault(KeyGeneratorOptions.URL_ENCODE_PARTITIONING); + } + public int getMarkersTimelineServerBasedBatchNumThreads() { return getInt(MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java index f1e41296f1dd..8b66255e9e47 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java @@ -146,16 +146,25 @@ public static String getRecordKey(GenericRecord record, String recordKeyField, b public static String getPartitionPath(GenericRecord record, String partitionPathField, boolean hiveStylePartitioning, boolean encodePartitionPath, boolean consistentLogicalTimestampEnabled) { String partitionPath = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true, consistentLogicalTimestampEnabled); - if (partitionPath == null || partitionPath.isEmpty()) { - partitionPath = HUDI_DEFAULT_PARTITION_PATH; + return handlePartitionPathDecoration(partitionPathField, partitionPath, encodePartitionPath, hiveStylePartitioning); + } + + public static String handlePartitionPathDecoration(String partitionPathField, + String partitionPathValue, + boolean encodePartitionPath, + boolean hiveStylePartitioning) { + String decoratedPartitionPath = partitionPathValue; + if (StringUtils.isNullOrEmpty(decoratedPartitionPath)) { + decoratedPartitionPath = HUDI_DEFAULT_PARTITION_PATH; } if (encodePartitionPath) { - partitionPath = PartitionPathEncodeUtils.escapePathName(partitionPath); + decoratedPartitionPath = PartitionPathEncodeUtils.escapePathName(decoratedPartitionPath); } if (hiveStylePartitioning) { - partitionPath = partitionPathField + "=" + partitionPath; + decoratedPartitionPath = partitionPathField + "=" + decoratedPartitionPath; } - return partitionPath; + + return decoratedPartitionPath; } /** diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 49827166258b..3f3c20e79784 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -165,8 +165,10 @@ object HoodieSparkUtils extends SparkAdapterSupport { } else { val readerAvroSchema = new Schema.Parser().parse(readerAvroSchemaStr) val transform: GenericRecord => GenericRecord = - if (sameSchema) identity - else { + if (sameSchema) { + identity + } else { + // NOTE: Avro schema parsing is performed outside of the transforming lambda HoodieAvroUtils.rewriteRecordDeep(_, readerAvroSchema) } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java index 95188bb0b68d..bede905e400c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java @@ -44,7 +44,7 @@ public class FileSystemTestUtils { public static final String FORWARD_SLASH = "/"; public static final String FILE_SCHEME = "file"; public static final String COLON = ":"; - public static final Random RANDOM = new Random(); + public static final Random RANDOM = new Random(0xDEED); public static Path getRandomOuterInMemPath() { String randomFileName = UUID.randomUUID().toString(); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java index bc1172f387f0..36c372cb1c96 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java @@ -24,11 +24,11 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.keygen.BuiltinKeyGenerator; import org.apache.hudi.keygen.ComplexKeyGenerator; +import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.table.BulkInsertPartitioner; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.sql.Column; @@ -38,6 +38,7 @@ import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; +import scala.collection.JavaConverters; import java.util.ArrayList; import java.util.Arrays; @@ -45,8 +46,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import scala.collection.JavaConverters; - import static org.apache.spark.sql.functions.callUDF; /** @@ -97,8 +96,31 @@ public static Dataset prepareHoodieDatasetForBulkInsert(SQLContext sqlConte // simple fields for both record key and partition path: can directly use withColumn String partitionPathField = keyGeneratorClass.equals(SimpleKeyGenerator.class.getName()) ? partitionPathFields : partitionPathFields.substring(partitionPathFields.indexOf(":") + 1); - rowDatasetWithRecordKeysAndPartitionPath = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields).cast(DataTypes.StringType)) - .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.col(partitionPathField).cast(DataTypes.StringType)); + + // TODO(HUDI-3993) cleanup duplication + String tableName = properties.getString(HoodieWriteConfig.TBL_NAME.key()); + String partitionPathDecorationUDFName = PARTITION_PATH_UDF_FN + tableName; + + boolean shouldURLEncodePartitionPath = config.shouldURLEncodePartitionPath(); + boolean isHiveStylePartitioned = config.isHiveStylePartitioningEnabled(); + + if (shouldURLEncodePartitionPath || isHiveStylePartitioned) { + sqlContext.udf().register( + partitionPathDecorationUDFName, + (UDF1) partitionPathValue -> + KeyGenUtils.handlePartitionPathDecoration(partitionPathField, partitionPathValue, + shouldURLEncodePartitionPath, isHiveStylePartitioned), + DataTypes.StringType); + + rowDatasetWithRecordKeysAndPartitionPath = + rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields).cast(DataTypes.StringType)) + .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, + callUDF(partitionPathDecorationUDFName, functions.col(partitionPathField).cast(DataTypes.StringType))); + } else { + rowDatasetWithRecordKeysAndPartitionPath = + rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields).cast(DataTypes.StringType)) + .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.col(partitionPathField).cast(DataTypes.StringType)); + } } else { // use udf String tableName = properties.getString(HoodieWriteConfig.TBL_NAME.key()); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java index c9404afe612f..7a54124e88b1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java @@ -23,17 +23,16 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.row.HoodieRowCreateHandle; import org.apache.hudi.io.storage.row.HoodieRowCreateHandleWithoutMetaFields; import org.apache.hudi.keygen.BuiltinKeyGenerator; +import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.table.HoodieTable; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.sql.catalyst.InternalRow; @@ -126,14 +125,17 @@ public void write(InternalRow record) throws IOException { if (populateMetaFields) { // usual path where meta fields are pre populated in prep step. partitionPath = String.valueOf(record.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS)); } else { // if meta columns are disabled. + // TODO(HUDI-3993) remove duplication, unify with HoodieDatasetBulkInsertHelper if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen partitionPath = ""; } else if (simpleKeyGen) { // SimpleKeyGen - Object parititionPathValue = record.get(simplePartitionFieldIndex, simplePartitionFieldDataType); - partitionPath = parititionPathValue != null ? parititionPathValue.toString() : PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH; - if (writeConfig.isHiveStylePartitioningEnabled()) { - partitionPath = (keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath; - } + Object partitionPathValue = record.get(simplePartitionFieldIndex, simplePartitionFieldDataType); + String partitionPathField = keyGeneratorOpt.get().getPartitionPathFields().get(0); + boolean shouldURLEncodePartitionPath = writeConfig.shouldURLEncodePartitionPath(); + boolean hiveStylePartitioningEnabled = writeConfig.isHiveStylePartitioningEnabled(); + + partitionPath = KeyGenUtils.handlePartitionPathDecoration(partitionPathField, + partitionPathValue == null ? null : partitionPathValue.toString(), shouldURLEncodePartitionPath, hiveStylePartitioningEnabled); } else { // only BuiltIn key generators are supported if meta fields are disabled. partitionPath = keyGeneratorOpt.get().getPartitionPath(record, structType); diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java index b9f77bccfd56..39da4ea4ff9e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java +++ b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java @@ -52,7 +52,7 @@ */ public class HoodieBulkInsertInternalWriterTestBase extends HoodieClientTestHarness { - protected static final Random RANDOM = new Random(); + protected static final Random RANDOM = new Random(0xDEED); @BeforeEach public void setUp() throws Exception { diff --git a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java index 8e87755c294d..f4658b968616 100644 --- a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java @@ -109,7 +109,7 @@ public void testDataInternalWriter(boolean sorted, boolean populateMetaFields) t } @Test - public void testDataInternalWriterHiveStylePartitioning() throws Exception { + public void testDataInternalWriterPartitioningHandling() throws Exception { boolean sorted = true; boolean populateMetaFields = false; // init config and table