Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public class HoodieCompactionConfig extends HoodieConfig {

public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA = ConfigProperty
.key("hoodie.compaction.preserve.commit.metadata")
.defaultValue(false)
.defaultValue(true)
.sinceVersion("0.11.0")
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1072,8 +1072,7 @@ public EngineType getEngineType() {
}

public boolean populateMetaFields() {
return Boolean.parseBoolean(getStringOrDefault(HoodieTableConfig.POPULATE_META_FIELDS,
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()));
return getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import java.util.Map;
import java.util.Set;

import static org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD_POS;

@SuppressWarnings("Duplicates")
/**
* Handle to merge incoming records to those in storage.
Expand Down Expand Up @@ -262,7 +264,7 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord ol
isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
}
}
return writeRecord(hoodieRecord, indexedRecord, isDelete);
return writeRecord(hoodieRecord, indexedRecord, isDelete, oldRecord);
}

protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
Expand All @@ -272,16 +274,16 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOExceptio
if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
return;
}
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()), null)) {
insertRecordsWritten++;
}
}

protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
return writeRecord(hoodieRecord, indexedRecord, false);
return writeRecord(hoodieRecord, indexedRecord, false, null);
}

protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete) {
protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete, GenericRecord oldRecord) {
Option recordMetadata = hoodieRecord.getData().getMetadata();
if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
Expand All @@ -292,8 +294,10 @@ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord
try {
if (indexedRecord.isPresent() && !isDelete) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get());
if (preserveMetadata) {
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get(), preserveMetadata, oldRecord);
if (preserveMetadata && useWriterSchema) { // useWriteSchema will be true only incase of compaction.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is the case, better name could be useWriterSchemaForCompaction

// do not preserve FILENAME_METADATA_FIELD
recordWithMetadataInSchema.put(FILENAME_METADATA_FIELD_POS, newFilePath.getName());
fileWriter.writeAvro(hoodieRecord.getRecordKey(), recordWithMetadataInSchema);
} else {
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ protected GenericRecord rewriteRecord(GenericRecord record) {
return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
}

protected GenericRecord rewriteRecord(GenericRecord record, boolean copyOverMetaFields, GenericRecord fallbackRecord) {
return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields, copyOverMetaFields, fallbackRecord);
}

public abstract List<WriteStatus> close();

public List<WriteStatus> writeStatuses() {
Expand Down
44 changes: 34 additions & 10 deletions hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public static Schema addMetadataFields(Schema schema) {
/**
* Adds the Hoodie metadata fields to the given schema.
*
* @param schema The schema
* @param schema The schema
* @param withOperationField Whether to include the '_hoodie_operation' field
*/
public static Schema addMetadataFields(Schema schema, boolean withOperationField) {
Expand Down Expand Up @@ -276,7 +276,7 @@ public static Schema getSchemaForFields(Schema fileSchema, List<String> fields)
List<Schema.Field> toBeAddedFields = new ArrayList<>();
Schema recordSchema = Schema.createRecord("HoodieRecordKey", "", "", false);

for (Schema.Field schemaField: fileSchema.getFields()) {
for (Schema.Field schemaField : fileSchema.getFields()) {
if (fields.contains(schemaField.name())) {
toBeAddedFields.add(new Schema.Field(schemaField.name(), schemaField.schema(), schemaField.doc(), schemaField.defaultVal()));
}
Expand All @@ -303,7 +303,7 @@ public static GenericRecord addOperationToRecord(GenericRecord record, HoodieOpe
* engines have varying constraints regarding treating the case-sensitivity of fields, its best to let caller
* determine that.
*
* @param schema Passed in schema
* @param schema Passed in schema
* @param newFieldNames Null Field names to be added
*/
public static Schema appendNullSchemaFields(Schema schema, List<String> newFieldNames) {
Expand Down Expand Up @@ -382,10 +382,34 @@ public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema newSch
return newRecord;
}

public static GenericRecord rewriteRecord(GenericRecord genericRecord, Schema newSchema, boolean copyOverMetaFields, GenericRecord fallbackRecord) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to have at least 1 UT covering this

GenericRecord newRecord = new GenericData.Record(newSchema);
boolean isSpecificRecord = genericRecord instanceof SpecificRecordBase;
for (Schema.Field f : newSchema.getFields()) {
if (!(isSpecificRecord && isMetadataField(f.name()))) {
copyOldValueOrSetDefault(genericRecord, newRecord, f);
}
if (isMetadataField(f.name()) && copyOverMetaFields) {
// if meta field exists in primary generic record, copy over.
if (genericRecord.getSchema().getField(f.name()) != null) {
copyOldValueOrSetDefault(genericRecord, newRecord, f);
} else if (fallbackRecord != null && fallbackRecord.getSchema().getField(f.name()) != null) {
// if not, try to copy from the fallback record.
copyOldValueOrSetDefault(fallbackRecord, newRecord, f);
}
}
}
if (!GenericData.get().validate(newSchema, newRecord)) {
throw new SchemaCompatibilityException(
"Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema);
}
return newRecord;
}

/**
* Converts list of {@link GenericRecord} provided into the {@link GenericRecord} adhering to the
* provided {@code newSchema}.
*
* <p>
* To better understand conversion rules please check {@link #rewriteRecord(GenericRecord, Schema)}
*/
public static List<GenericRecord> rewriteRecords(List<GenericRecord> records, Schema newSchema) {
Expand Down Expand Up @@ -491,9 +515,8 @@ public static Object getNestedFieldVal(GenericRecord record, String fieldName, b
* Returns the string value of the given record {@code rec} and field {@code fieldName}.
* The field and value both could be missing.
*
* @param rec The record
* @param rec The record
* @param fieldName The field name
*
* @return the string form of the field
* or empty if the schema does not contain the field name or the value is null
*/
Expand All @@ -507,7 +530,7 @@ public static Option<String> getNullableValAsString(GenericRecord rec, String fi
* This method converts values for fields with certain Avro/Parquet data types that require special handling.
*
* @param fieldSchema avro field schema
* @param fieldValue avro field value
* @param fieldValue avro field value
* @return field value either converted (for certain data types) or as it is.
*/
public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue, boolean consistentLogicalTimestampEnabled) {
Expand All @@ -527,15 +550,15 @@ public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object

/**
* This method converts values for fields with certain Avro Logical data types that require special handling.
*
* <p>
* Logical Date Type is converted to actual Date value instead of Epoch Integer which is how it is
* represented/stored in parquet.
*
* <p>
* Decimal Data Type is converted to actual decimal value instead of bytes/fixed which is how it is
* represented/stored in parquet.
*
* @param fieldSchema avro field schema
* @param fieldValue avro field value
* @param fieldValue avro field value
* @return field value either converted (for certain data types) or as it is.
*/
private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue, boolean consistentLogicalTimestampEnabled) {
Expand Down Expand Up @@ -569,6 +592,7 @@ public static Schema getNullSchema() {
/**
* Sanitizes Name according to Avro rule for names.
* Removes characters other than the ones mentioned in https://avro.apache.org/docs/current/spec.html#names .
*
* @param name input name
* @return sanitized name
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public abstract class HoodieRecord<T> implements Serializable {
public static final String OPERATION_METADATA_FIELD = "_hoodie_operation";
public static final String HOODIE_IS_DELETED = "_hoodie_is_deleted";

public static int FILENAME_METADATA_FIELD_POS = 4;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be a separate cleanup task: make constants for all meta fields and adopt them across codebase


public static final List<String> HOODIE_META_COLUMNS =
CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ public class HoodieTableConfig extends HoodieConfig {
.noDefaultValue()
.withDocumentation("Base path of the dataset that needs to be bootstrapped as a Hudi table");

public static final ConfigProperty<String> POPULATE_META_FIELDS = ConfigProperty
public static final ConfigProperty<Boolean> POPULATE_META_FIELDS = ConfigProperty
.key("hoodie.populate.meta.fields")
.defaultValue("true")
.defaultValue(true)
.withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated "
+ "and incremental queries will not be functional. This is only meant to be used for append only/immutable data for batch processing");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ object HoodieSparkSqlWriter {
val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean
val populateMetaFields = java.lang.Boolean.parseBoolean((parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))))
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)

HoodieTableMetaClient.withPropertyBuilder()
Expand Down Expand Up @@ -447,8 +448,8 @@ object HoodieSparkSqlWriter {
instantTime: String,
partitionColumns: String): (Boolean, common.util.Option[String]) = {
val sparkContext = sqlContext.sparkContext
val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean
val populateMetaFields = java.lang.Boolean.parseBoolean((parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))))
val dropPartitionColumns =
parameters.getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key(), DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue()).toBoolean
// register classes & schemas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ public void stream(Dataset<Row> streamingInput, String operationType, String che
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1")
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "true")
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
.option(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA.key(), "false")
.option(HoodieWriteConfig.TBL_NAME.key(), tableName).option("checkpointLocation", checkpointLocation)
.outputMode(OutputMode.Append());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec
SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
client.bootstrap(Option.empty());
checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants);
numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);

// Rollback Bootstrap
HoodieActiveTimeline.deleteInstantFile(metaClient.getFs(), metaClient.getMetaPath(), new HoodieInstant(State.COMPLETED,
Expand Down Expand Up @@ -284,7 +284,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec
}

checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants);
numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);

// Upsert case
long updateTimestamp = Instant.now().toEpochMilli();
Expand All @@ -296,15 +296,15 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec
String newInstantTs = client.startCommit();
client.upsert(updateBatch, newInstantTs);
checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1,
updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit);
updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit, true);

if (deltaCommit) {
Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
assertTrue(compactionInstant.isPresent());
client.compact(compactionInstant.get());
checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles,
numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit,
Arrays.asList(compactionInstant.get()));
Arrays.asList(compactionInstant.get()), !config.isPreserveHoodieCommitMetadataForCompaction());
}
}

Expand Down Expand Up @@ -334,14 +334,14 @@ public void testMetadataAndFullBootstrapWithUpdatesMOR() throws Exception {
}

private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles,
int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit) throws Exception {
int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, boolean validateRecordsForCommitTime) throws Exception {
checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants,
expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant));
expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant), validateRecordsForCommitTime);
}

private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles,
int expNumInstants, int numVersions, long expTimestamp, long expROTimestamp, boolean isDeltaCommit,
List<String> instantsWithValidRecords) throws Exception {
List<String> instantsWithValidRecords, boolean validateRecordsForCommitTime) throws Exception {
metaClient.reloadActiveTimeline();
assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants());
assertEquals(instant, metaClient.getActiveTimeline()
Expand All @@ -361,8 +361,10 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta
if (!isDeltaCommit) {
String predicate = String.join(", ",
instantsWithValidRecords.stream().map(p -> "\"" + p + "\"").collect(Collectors.toList()));
assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN "
+ "(" + predicate + ")").count());
if (validateRecordsForCommitTime) {
assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN "
+ "(" + predicate + ")").count());
}
Dataset<Row> missingOriginal = sqlContext.sql("select a._row_key from original a where a._row_key not "
+ "in (select _hoodie_record_key from bootstrapped)");
assertEquals(0, missingOriginal.count());
Expand Down
Loading