diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 0aac9308da439..2570e204eef3a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -250,7 +250,7 @@ public class HoodieCompactionConfig extends HoodieConfig { public static final ConfigProperty PRESERVE_COMMIT_METADATA = ConfigProperty .key("hoodie.compaction.preserve.commit.metadata") - .defaultValue(false) + .defaultValue(true) .sinceVersion("0.11.0") .withDocumentation("When rewriting data, preserves existing hoodie_commit_time"); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 2d71c4d738886..c64b4ed88b5f2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1072,8 +1072,7 @@ public EngineType getEngineType() { } public boolean populateMetaFields() { - return Boolean.parseBoolean(getStringOrDefault(HoodieTableConfig.POPULATE_META_FIELDS, - HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())); + return getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 32d4ec2a6d794..d38f66a86f912 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -61,6 +61,8 @@ import java.util.Map; import java.util.Set; +import static org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD_POS; + @SuppressWarnings("Duplicates") /** * Handle to merge incoming records to those in storage. @@ -262,7 +264,7 @@ private boolean writeUpdateRecord(HoodieRecord hoodieRecord, GenericRecord ol isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation()); } } - return writeRecord(hoodieRecord, indexedRecord, isDelete); + return writeRecord(hoodieRecord, indexedRecord, isDelete, oldRecord); } protected void writeInsertRecord(HoodieRecord hoodieRecord) throws IOException { @@ -272,16 +274,16 @@ protected void writeInsertRecord(HoodieRecord hoodieRecord) throws IOExceptio if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) { return; } - if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) { + if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()), null)) { insertRecordsWritten++; } } protected boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord) { - return writeRecord(hoodieRecord, indexedRecord, false); + return writeRecord(hoodieRecord, indexedRecord, false, null); } - protected boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord, boolean isDelete) { + protected boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord, boolean isDelete, GenericRecord oldRecord) { Option recordMetadata = hoodieRecord.getData().getMetadata(); if (!partitionPath.equals(hoodieRecord.getPartitionPath())) { HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " @@ -292,8 +294,10 @@ protected boolean writeRecord(HoodieRecord hoodieRecord, Option close(); public List writeStatuses() { diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 209721e24a8d9..5cb18dc8d1509 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -176,7 +176,7 @@ public static Schema addMetadataFields(Schema schema) { /** * Adds the Hoodie metadata fields to the given schema. * - * @param schema The schema + * @param schema The schema * @param withOperationField Whether to include the '_hoodie_operation' field */ public static Schema addMetadataFields(Schema schema, boolean withOperationField) { @@ -276,7 +276,7 @@ public static Schema getSchemaForFields(Schema fileSchema, List fields) List toBeAddedFields = new ArrayList<>(); Schema recordSchema = Schema.createRecord("HoodieRecordKey", "", "", false); - for (Schema.Field schemaField: fileSchema.getFields()) { + for (Schema.Field schemaField : fileSchema.getFields()) { if (fields.contains(schemaField.name())) { toBeAddedFields.add(new Schema.Field(schemaField.name(), schemaField.schema(), schemaField.doc(), schemaField.defaultVal())); } @@ -303,7 +303,7 @@ public static GenericRecord addOperationToRecord(GenericRecord record, HoodieOpe * engines have varying constraints regarding treating the case-sensitivity of fields, its best to let caller * determine that. * - * @param schema Passed in schema + * @param schema Passed in schema * @param newFieldNames Null Field names to be added */ public static Schema appendNullSchemaFields(Schema schema, List newFieldNames) { @@ -382,10 +382,34 @@ public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema newSch return newRecord; } + public static GenericRecord rewriteRecord(GenericRecord genericRecord, Schema newSchema, boolean copyOverMetaFields, GenericRecord fallbackRecord) { + GenericRecord newRecord = new GenericData.Record(newSchema); + boolean isSpecificRecord = genericRecord instanceof SpecificRecordBase; + for (Schema.Field f : newSchema.getFields()) { + if (!(isSpecificRecord && isMetadataField(f.name()))) { + copyOldValueOrSetDefault(genericRecord, newRecord, f); + } + if (isMetadataField(f.name()) && copyOverMetaFields) { + // if meta field exists in primary generic record, copy over. + if (genericRecord.getSchema().getField(f.name()) != null) { + copyOldValueOrSetDefault(genericRecord, newRecord, f); + } else if (fallbackRecord != null && fallbackRecord.getSchema().getField(f.name()) != null) { + // if not, try to copy from the fallback record. + copyOldValueOrSetDefault(fallbackRecord, newRecord, f); + } + } + } + if (!GenericData.get().validate(newSchema, newRecord)) { + throw new SchemaCompatibilityException( + "Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema); + } + return newRecord; + } + /** * Converts list of {@link GenericRecord} provided into the {@link GenericRecord} adhering to the * provided {@code newSchema}. - * + *

* To better understand conversion rules please check {@link #rewriteRecord(GenericRecord, Schema)} */ public static List rewriteRecords(List records, Schema newSchema) { @@ -491,9 +515,8 @@ public static Object getNestedFieldVal(GenericRecord record, String fieldName, b * Returns the string value of the given record {@code rec} and field {@code fieldName}. * The field and value both could be missing. * - * @param rec The record + * @param rec The record * @param fieldName The field name - * * @return the string form of the field * or empty if the schema does not contain the field name or the value is null */ @@ -507,7 +530,7 @@ public static Option getNullableValAsString(GenericRecord rec, String fi * This method converts values for fields with certain Avro/Parquet data types that require special handling. * * @param fieldSchema avro field schema - * @param fieldValue avro field value + * @param fieldValue avro field value * @return field value either converted (for certain data types) or as it is. */ public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue, boolean consistentLogicalTimestampEnabled) { @@ -527,15 +550,15 @@ public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object /** * This method converts values for fields with certain Avro Logical data types that require special handling. - * + *

* Logical Date Type is converted to actual Date value instead of Epoch Integer which is how it is * represented/stored in parquet. - * + *

* Decimal Data Type is converted to actual decimal value instead of bytes/fixed which is how it is * represented/stored in parquet. * * @param fieldSchema avro field schema - * @param fieldValue avro field value + * @param fieldValue avro field value * @return field value either converted (for certain data types) or as it is. */ private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue, boolean consistentLogicalTimestampEnabled) { @@ -569,6 +592,7 @@ public static Schema getNullSchema() { /** * Sanitizes Name according to Avro rule for names. * Removes characters other than the ones mentioned in https://avro.apache.org/docs/current/spec.html#names . + * * @param name input name * @return sanitized name */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index ac30766dd2f03..0f21ae1bef185 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -42,6 +42,8 @@ public abstract class HoodieRecord implements Serializable { public static final String OPERATION_METADATA_FIELD = "_hoodie_operation"; public static final String HOODIE_IS_DELETED = "_hoodie_is_deleted"; + public static int FILENAME_METADATA_FIELD_POS = 4; + public static final List HOODIE_META_COLUMNS = CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD, RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index dc010366cd3b5..0415d9d3831ce 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -172,9 +172,9 @@ public class HoodieTableConfig extends HoodieConfig { .noDefaultValue() .withDocumentation("Base path of the dataset that needs to be bootstrapped as a Hudi table"); - public static final ConfigProperty POPULATE_META_FIELDS = ConfigProperty + public static final ConfigProperty POPULATE_META_FIELDS = ConfigProperty .key("hoodie.populate.meta.fields") - .defaultValue("true") + .defaultValue(true) .withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated " + "and incremental queries will not be functional. This is only meant to be used for append only/immutable data for batch processing"); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 022eef5fff6a5..bb4e04cf6fda0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -395,7 +395,8 @@ object HoodieSparkSqlWriter { val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters) val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) - val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean + val populateMetaFields = java.lang.Boolean.parseBoolean((parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), + String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())))) val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) HoodieTableMetaClient.withPropertyBuilder() @@ -447,8 +448,8 @@ object HoodieSparkSqlWriter { instantTime: String, partitionColumns: String): (Boolean, common.util.Option[String]) = { val sparkContext = sqlContext.sparkContext - val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), - HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean + val populateMetaFields = java.lang.Boolean.parseBoolean((parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), + String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())))) val dropPartitionColumns = parameters.getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key(), DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue()).toBoolean // register classes & schemas diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index 5baaffab0cf7c..4af3943966aa3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -364,6 +364,7 @@ public void stream(Dataset streamingInput, String operationType, String che .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1") .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "true") .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true") + .option(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA.key(), "false") .option(HoodieWriteConfig.TBL_NAME.key(), tableName).option("checkpointLocation", checkpointLocation) .outputMode(OutputMode.Append()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index d2257f58d0e80..86d30496896d8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -256,7 +256,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec SparkRDDWriteClient client = new SparkRDDWriteClient(context, config); client.bootstrap(Option.empty()); checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, - numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants); + numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true); // Rollback Bootstrap HoodieActiveTimeline.deleteInstantFile(metaClient.getFs(), metaClient.getMetaPath(), new HoodieInstant(State.COMPLETED, @@ -284,7 +284,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec } checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, - numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants); + numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true); // Upsert case long updateTimestamp = Instant.now().toEpochMilli(); @@ -296,7 +296,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec String newInstantTs = client.startCommit(); client.upsert(updateBatch, newInstantTs); checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1, - updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit); + updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit, true); if (deltaCommit) { Option compactionInstant = client.scheduleCompaction(Option.empty()); @@ -304,7 +304,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec client.compact(compactionInstant.get()); checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles, numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit, - Arrays.asList(compactionInstant.get())); + Arrays.asList(compactionInstant.get()), !config.isPreserveHoodieCommitMetadataForCompaction()); } } @@ -334,14 +334,14 @@ public void testMetadataAndFullBootstrapWithUpdatesMOR() throws Exception { } private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles, - int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit) throws Exception { + int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, boolean validateRecordsForCommitTime) throws Exception { checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants, - expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant)); + expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant), validateRecordsForCommitTime); } private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles, int expNumInstants, int numVersions, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, - List instantsWithValidRecords) throws Exception { + List instantsWithValidRecords, boolean validateRecordsForCommitTime) throws Exception { metaClient.reloadActiveTimeline(); assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants()); assertEquals(instant, metaClient.getActiveTimeline() @@ -361,8 +361,10 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta if (!isDeltaCommit) { String predicate = String.join(", ", instantsWithValidRecords.stream().map(p -> "\"" + p + "\"").collect(Collectors.toList())); - assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN " - + "(" + predicate + ")").count()); + if (validateRecordsForCommitTime) { + assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN " + + "(" + predicate + ")").count()); + } Dataset missingOriginal = sqlContext.sql("select a._row_key from original a where a._row_key not " + "in (select _hoodie_record_key from bootstrapped)"); assertEquals(0, missingOriginal.count()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java index 9146cdc4e81f7..6cd3ae33399a6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java @@ -248,7 +248,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec SparkRDDWriteClient client = new SparkRDDWriteClient(context, config); client.bootstrap(Option.empty()); checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, - numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants); + numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true); // Rollback Bootstrap if (deltaCommit) { @@ -278,7 +278,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec } checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, - numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants); + numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true); // Upsert case long updateTimestamp = Instant.now().toEpochMilli(); @@ -290,7 +290,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec String newInstantTs = client.startCommit(); client.upsert(updateBatch, newInstantTs); checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1, - updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit); + updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit, true); if (deltaCommit) { Option compactionInstant = client.scheduleCompaction(Option.empty()); @@ -298,7 +298,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec client.compact(compactionInstant.get()); checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles, numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit, - Arrays.asList(compactionInstant.get())); + Arrays.asList(compactionInstant.get()), !config.isPreserveHoodieCommitMetadataForCompaction()); } } @@ -328,14 +328,14 @@ public void testMetadataAndFullBootstrapWithUpdatesMOR() throws Exception { } private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles, - int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit) throws Exception { + int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, boolean validateRecordsForCommitTime) throws Exception { checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants, - expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant)); + expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant), validateRecordsForCommitTime); } private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles, int expNumInstants, int numVersions, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, - List instantsWithValidRecords) throws Exception { + List instantsWithValidRecords, boolean validateCommitRecords) throws Exception { metaClient.reloadActiveTimeline(); assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants()); assertEquals(instant, metaClient.getActiveTimeline() @@ -355,8 +355,10 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta if (!isDeltaCommit) { String predicate = String.join(", ", instantsWithValidRecords.stream().map(p -> "\"" + p + "\"").collect(Collectors.toList())); - assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN " - + "(" + predicate + ")").count()); + if (validateCommitRecords) { + assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN " + + "(" + predicate + ")").count()); + } Dataset missingOriginal = sqlContext.sql("select a._row_key from original a where a._row_key not " + "in (select _hoodie_record_key from bootstrapped)"); assertEquals(0, missingOriginal.count()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index ed6ef87b8e14f..5c20939cfb532 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -272,8 +272,9 @@ class TestMORDataSource extends HoodieClientTestBase { .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commit5Time) .option(DataSourceReadOptions.END_INSTANTTIME.key, commit6Time) .load(basePath) - // compaction updated 150 rows + inserted 2 new row - assertEquals(152, hudiIncDF6.count()) + // even though compaction updated 150 rows, since preserve commit metadata is true, they won't be part of incremental query. + // inserted 2 new row + assertEquals(2, hudiIncDF6.count()) } @Test diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java index e9ed609ca25f9..4866a5be5c583 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java @@ -66,7 +66,7 @@ public Optional createWriter(String writeUUID, StructType sche String path = options.get("path").get(); String tblName = options.get(HoodieWriteConfig.TBL_NAME.key()).get(); boolean populateMetaFields = options.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), - Boolean.parseBoolean(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())); + HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()); Map properties = options.asMap(); // Auto set the value of "hoodie.parquet.writelegacyformat.enabled" mayBeOverwriteParquetWriteLegacyFormatProp(properties, schema); diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java index 2e46dea390678..4f7ff89a90a33 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java @@ -53,7 +53,7 @@ public Table getTable(StructType schema, Transform[] partitioning, Map>> readFromSource( .setPartitionFields(partitionColumns) .setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())) .setPopulateMetaFields(props.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), - Boolean.parseBoolean(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))) + HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())) .setKeyGeneratorClassProp(props.getProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), SimpleKeyGenerator.class.getName())) .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);