Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,16 @@ public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
return getInsertValue(schema, new Properties());
}

public Option<IndexedRecord> toHoodieMetadataRecord() {
if (key == null || this.isDeletedRecord) {
return Option.empty();
}

HoodieMetadataRecord record = new HoodieMetadataRecord(key, type, filesystemMetadata, bloomFilterMetadata,
columnStatMetadata, recordIndexMetadata);
return Option.of(record);
}

/**
* Returns the list of filenames added as part of this record.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Tuple3;
import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
Expand All @@ -43,7 +42,6 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -300,12 +298,7 @@ private static List<RowData> readColumnStatsIndexByColumns(
return records.collectAsList().stream().parallel().map(record -> {
// schema and props are ignored for generating metadata record from the payload
// instead, the underlying file system, or bloom filter, or columns stats metadata (part of payload) are directly used
GenericRecord genericRecord;
try {
genericRecord = (GenericRecord) record.getData().getInsertValue(null, null).orElse(null);
} catch (IOException e) {
throw new HoodieException("Exception while getting insert value from metadata payload");
}
GenericRecord genericRecord = (GenericRecord) record.getData().toHoodieMetadataRecord().orElse(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this still calls the HoodieMetadataPayload APIs which implements HoodieRecordPayload, shall we keep these as is? Ideally, once we migrate the metadata payload to new merger API, these can be got rid of.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can leave it as is. Sound like it will not block rfc-46. Will undo the change for it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checked the files; if we leave it as is, then there is nothing to change for this PR.

return (RowData) converter.convert(genericRecord);
}
).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
// NOTE: Explicit conversion is required for Scala 2.11
metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
toScalaOption(record.getData.getInsertValue(null, null))
toScalaOption(record.getData.toHoodieMetadataRecord)
.map(metadataRecord => metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
.orNull
}))
Expand Down