Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
5a8d574
just schema change
Aug 12, 2025
d211a2c
current progress
Aug 12, 2025
755ea1e
builds
Aug 13, 2025
7089c12
revert avro refactor not needed
Aug 13, 2025
7d221fb
add parquet type parsing
Aug 13, 2025
a852c2a
fix another test
Aug 13, 2025
714e47e
fix another test
Aug 13, 2025
5c34d1a
fix more tests
Aug 13, 2025
b0c0c7f
fix issue with reading col stats record
Aug 13, 2025
35f7030
Merge branch 'master' into col_stats_new_schema
Aug 13, 2025
71d846c
current progress. Timestamp millis seem to be converted to micros in …
Aug 14, 2025
b41b39f
tests mostly working now. small precision decimals giving problems
Aug 16, 2025
e7cfe3e
testing working
Aug 16, 2025
017a4cc
add versioning and refactor
Aug 17, 2025
9b88584
refactor value metadata
Aug 17, 2025
789bbd1
minor cleanup
Aug 17, 2025
ac9702f
refactor the wrapping
Aug 17, 2025
b495642
builds
Aug 17, 2025
ecf1b14
Merge branch 'master' into col_stats_new_schema
Aug 17, 2025
7d067bb
clean up a few things
Aug 17, 2025
b9f3fc3
use null type instead of the v1 type
Aug 17, 2025
e2c834f
more cleanup
Aug 18, 2025
38a1e14
add updgrade/downgrade handling
Aug 18, 2025
d12c482
Merge branch 'master' into col_stats_new_schema
Aug 25, 2025
820ca40
Merge branch 'master' into col_stats_new_schema
Sep 3, 2025
b10e7c8
fix test failure datagen
Sep 3, 2025
eba6ceb
current progress on testing
Sep 8, 2025
8fff23d
Merge branch 'master' into col_stats_new_schema
Sep 8, 2025
fc598f6
disable tests not ready
Sep 8, 2025
9a59641
build for spark 3.4
Sep 8, 2025
add0ab0
builds on spark3.3
Sep 8, 2025
672ce5b
test passes for 3.3
Sep 8, 2025
d655091
fix spark 3.4 and hopefully 3.5
Sep 8, 2025
262492c
fix index version test
Sep 8, 2025
e8d5675
fix some more failing tests
Sep 9, 2025
bee20e5
fix array logic
Sep 9, 2025
73f2303
fix validation for supported types
Sep 9, 2025
bad18ed
fix some more test failures
Sep 10, 2025
be8241b
fix data type issue with timestamp partition cols
Sep 10, 2025
ed16043
fix single array value not using arraycomparable
Sep 10, 2025
ac3364a
disable test that doesn't work without row writer
Sep 10, 2025
0d38bbe
allow row writer again, but disable it for the ds tests with the logi…
Sep 10, 2025
5ee696f
fix schema evolution
Sep 10, 2025
e2d4f3e
fix expression index
Sep 11, 2025
864d78c
make decimal fixed by default in internal schema
Sep 11, 2025
e1a45ee
rever disabled tests
Sep 11, 2025
ef6350d
fix expression index
Sep 12, 2025
079393a
fix and disable a few more tests
Sep 12, 2025
9e15009
fix partition stats testing and test value type enums so they don't g…
Sep 12, 2025
2878663
revert throw exception
Sep 12, 2025
c5e59da
switch to using localdate as the type for date since online research …
Sep 13, 2025
9687489
fix flink array support
Sep 13, 2025
5e6b116
fix failing tests
Sep 13, 2025
ba5e119
fix the type processor things
Sep 13, 2025
a955d92
fix spark 33 issue
Sep 13, 2025
6333f2c
actually fix spark
Sep 13, 2025
11edda9
move spark value metadata to its own file, revert the gh actions chan…
Sep 15, 2025
4f2f5c4
allow compare between arrays of different length
Sep 15, 2025
1bdb64b
Merge branch 'master' into col_stats_new_schema
Sep 15, 2025
90dc68d
fix test import
Sep 15, 2025
8eff944
finish the backwards compat testing
Sep 15, 2025
3630ba2
fix schema because nulls need to come first in unions and only null d…
Sep 15, 2025
87bfc6f
add fix for potential npe
Sep 16, 2025
b31de9a
adjust spark adapter bug for 3.3
Sep 16, 2025
357675c
remove array support for col stats
Sep 16, 2025
9abec52
refactor collect col stats to break apart v1 and v2 stuff
Sep 16, 2025
0e45de4
address some minor comments
Sep 17, 2025
fbb531c
fix validation for .zip, and also get rid of failing bundle validation
Sep 17, 2025
c84c787
switch to original type for 1.10.1 parquet compat
Sep 18, 2025
5beb5b0
revert bot.yml
Sep 18, 2025
cca547d
remove another ref
Sep 18, 2025
06c79cf
get more debug from validation
Sep 18, 2025
7029fe7
Merge branch 'master' into col_stats_new_schema
Sep 18, 2025
f7dad4d
try to print out the logs again, also make the value metadata spark a…
Sep 18, 2025
6b1fe2e
another try to get more debugging
Sep 18, 2025
656d4e1
add parquet 10 adapter to prevent logical annotation issues
Sep 18, 2025
8f946f5
split adapter into 2 parts, and try to load class because checking ve…
Sep 18, 2025
b462c63
another try to get it working
Sep 18, 2025
c01ae4e
use instanceof instead of those enums
Sep 18, 2025
681833e
restore validate script
Sep 18, 2025
b9a3ecc
add spark conf flags to the validation
Sep 19, 2025
f86bbf2
disable the hive bundle validation
Sep 19, 2025
f1b7f67
Merge branch 'master' into col_stats_new_schema
Sep 19, 2025
dade2b6
directly use the class so it will fail when loaded
Sep 19, 2025
c174395
addressing review comments
Sep 19, 2025
17d5552
Merge branch 'master' into col_stats_new_schema
Sep 22, 2025
62cfcb4
catch all exception
Sep 22, 2025
d018e59
catch throwable instead of exception
Sep 22, 2025
6840594
enable upgrade tests
Sep 22, 2025
1940eb8
address review comments
Sep 23, 2025
fcc6153
Fix nits
yihua Sep 23, 2025
4cfa36c
Merge branch 'master' into col_stats_new_schema
yihua Sep 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.io.HoodieRangeInfoHandle;
import org.apache.hudi.stats.ValueMetadata;
import org.apache.hudi.table.HoodieTable;

import org.slf4j.Logger;
Expand All @@ -52,7 +53,6 @@
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static org.apache.hudi.avro.HoodieAvroWrapperUtils.unwrapAvroValueWrapper;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
Expand Down Expand Up @@ -231,12 +231,13 @@ protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>(fileToColumnStatsMap.size());

for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) {
ValueMetadata valueMetadata = ValueMetadata.getValueMetadata(entry.getValue().getValueType());
result.add(Pair.of(entry.getKey().getLeft(),
new BloomIndexFileInfo(
partitionAndFileNameToFileId.get(entry.getKey()),
// NOTE: Here we assume that the type of the primary key field is string
unwrapAvroValueWrapper(entry.getValue().getMinValue()).toString(),
unwrapAvroValueWrapper(entry.getValue().getMaxValue()).toString()
valueMetadata.unwrapValue(entry.getValue().getMinValue()).toString(),
valueMetadata.unwrapValue(entry.getValue().getMaxValue()).toString()
Comment on lines +234 to +240
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ensuring backwards compatibility through tests, could you generate test artifacts/tables by using 0.15.0 and 1.0.2 release (and from this PR) with column stats enabled and columns of different primitive and logical types (use MOR with log files), and used that for validating col stats reading and data skipping? That'll give us much more confidence on compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonvex also, when doing the backward compatibility testing, let's enforce stricter validation by comparing the metadata records in column stats index in the Metadata Table written by 0.14 and 1.1 (this PR) using the same set of inserts/updates to make sure the storage bytes are the same.

)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
Expand Down Expand Up @@ -59,7 +58,9 @@
import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.metadata.HoodieIndexVersion;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.stats.HoodieColumnRangeMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.CommonClientUtils;
Expand All @@ -83,6 +84,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata;

/**
Expand Down Expand Up @@ -432,15 +434,17 @@ protected void processAppendResult(AppendResult result, Option<HoodieLogBlock> d
updateWriteStatus(result, stat);

if (config.isMetadataColumnStatsIndexEnabled()) {
HoodieIndexVersion indexVersion = HoodieTableMetadataUtil.existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, hoodieTable.getMetaClient());
Set<String> columnsToIndexSet = new HashSet<>(HoodieTableMetadataUtil
.getColumnsToIndex(hoodieTable.getMetaClient().getTableConfig(),
config.getMetadataConfig(), Lazy.eagerly(Option.of(writeSchemaWithMetaFields)),
Option.of(this.recordMerger.getRecordType())).keySet());
Option.of(this.recordMerger.getRecordType()), indexVersion).keySet());
final List<Pair<String, Schema.Field>> fieldsToIndex = columnsToIndexSet.stream()
.map(fieldName -> HoodieAvroUtils.getSchemaForField(writeSchemaWithMetaFields, fieldName)).collect(Collectors.toList());
try {
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataMap =
collectColumnRangeMetadata(recordList.iterator(), fieldsToIndex, stat.getPath(), writeSchemaWithMetaFields, storage.getConf());
collectColumnRangeMetadata(recordList.iterator(), fieldsToIndex, stat.getPath(), writeSchemaWithMetaFields, storage.getConf(),
indexVersion);
stat.putRecordsStats(columnRangeMetadataMap);
} catch (HoodieException e) {
throw new HoodieAppendException("Failed to extract append result", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
Expand All @@ -45,6 +44,8 @@
import java.util.Collections;
import java.util.List;

import static org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;

/**
* Compared to other Write Handles, HoodieBinaryCopyHandle merge multiple inputFiles into a single outputFile without performing
* extra operations like data serialization/deserialization or compression/decompression.
Expand Down Expand Up @@ -75,12 +76,12 @@ private MessageType getWriteSchema(HoodieWriteConfig config, List<StoragePath> i
return fileSchema;
} catch (Exception e) {
LOG.error("Failed to read schema from input file", e);
throw new HoodieIOException("Failed to read schema from input file when schema evolution is disabled: " + inputFiles.get(0),
throw new HoodieIOException("Failed to read schema from input file when schema evolution is disabled: " + inputFiles.get(0),
e instanceof IOException ? (IOException) e : new IOException(e));
}
} else {
// Default behavior: use the table's write schema for evolution
return new AvroSchemaConverter(conf).convert(writeSchemaWithMetaFields);
return getAvroSchemaConverter(conf).convert(writeSchemaWithMetaFields);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import static org.apache.hudi.metadata.HoodieMetadataWriteUtils.createMetadataWriteConfig;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.createRecordIndexDefinition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault;
Expand Down Expand Up @@ -578,10 +579,11 @@ private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> initializeCo
if (partitionIdToAllFilesMap.isEmpty()) {
return Pair.of(Collections.emptyList(), Pair.of(fileGroupCount, engineContext.emptyHoodieData()));
}
HoodieIndexVersion columnStatsIndexVersion = existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, dataMetaClient);
// Find the columns to index
final List<String> columnsToIndex = new ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(dataMetaClient.getTableConfig(),
dataWriteConfig.getMetadataConfig(), tableSchema, true,
Option.of(dataWriteConfig.getRecordMerger().getRecordType())).keySet());
Option.of(dataWriteConfig.getRecordMerger().getRecordType()), columnStatsIndexVersion).keySet());

if (columnsToIndex.isEmpty()) {
// this can only happen if meta fields are disabled and cols to index is not explicitly overridden.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public UpgradeDowngrade.TableConfigChangeSet downgrade(HoodieWriteConfig config,
&& isComplexKeyGeneratorWithSingleRecordKeyField(metaClient.getTableConfig())) {
throw new HoodieUpgradeDowngradeException(getComplexKeygenErrorMessage("downgrade"));
}
// Handle secondary index.
UpgradeDowngradeUtils.dropNonV1SecondaryIndexPartitions(
// Handle index Changes
UpgradeDowngradeUtils.dropNonV1IndexPartitions(
config, context, table, upgradeDowngradeHelper, "downgrading from table version nine to eight");
// Update table properties.
Set<ConfigProperty> propertiesToRemove = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieIndexVersion;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
Expand Down Expand Up @@ -285,23 +284,15 @@ static boolean isMetadataTableBehindDataTable(HoodieWriteConfig config,
* @param table Hoodie table
* @param operationType Type of operation (upgrade/downgrade)
*/
public static void dropNonV1SecondaryIndexPartitions(HoodieWriteConfig config, HoodieEngineContext context,
HoodieTable table, SupportsUpgradeDowngrade upgradeDowngradeHelper, String operationType) {
public static void dropNonV1IndexPartitions(HoodieWriteConfig config, HoodieEngineContext context,
HoodieTable table, SupportsUpgradeDowngrade upgradeDowngradeHelper, String operationType) {
HoodieTableMetaClient metaClient = table.getMetaClient();
try (BaseHoodieWriteClient writeClient = upgradeDowngradeHelper.getWriteClient(config, context)) {
List<String> mdtPartitions = metaClient.getTableConfig().getMetadataPartitions()
.stream()
.filter(partition -> {
// Only drop secondary indexes that are not V1
return metaClient.getIndexForMetadataPartition(partition)
.map(indexDef -> {
if (MetadataPartitionType.fromPartitionPath(indexDef.getIndexName()).equals(MetadataPartitionType.SECONDARY_INDEX)) {
return HoodieIndexVersion.V1.lowerThan(indexDef.getVersion());
}
return false;
})
.orElse(false);
})
.filter(partition -> metaClient.getIndexForMetadataPartition(partition)
.map(indexDef -> HoodieIndexVersion.V1.lowerThan(indexDef.getVersion()))
Comment on lines +287 to +294
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we abstract index upgrade process and move the logic to a central place so it's easier to handle index layout upgrade/downgrade?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll take this separately.

.orElse(false))
.collect(Collectors.toList());
LOG.info("Dropping from MDT partitions for {}: {}", operationType, mdtPartitions);
if (!mdtPartitions.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ void testDowngradeDropsOnlyV2OrAboveIndexes() {
)).thenAnswer(invocation -> null); // Do nothing

// Mock the dropNonV1SecondaryIndexPartitions to simulate dropping V2 indexes
mockedUtils.when(() -> UpgradeDowngradeUtils.dropNonV1SecondaryIndexPartitions(
mockedUtils.when(() -> UpgradeDowngradeUtils.dropNonV1IndexPartitions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have functional tests (without mocks) on dropping v2 indexes?

eq(config),
eq(context),
eq(table),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi.io.log.block;

import org.apache.hudi.avro.AvroSchemaCache;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
Expand All @@ -29,6 +28,8 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.io.storage.ColumnRangeMetadataProvider;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.metadata.HoodieIndexVersion;
import org.apache.hudi.stats.HoodieColumnRangeMetadata;
import org.apache.hudi.storage.HoodieStorage;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -97,10 +98,10 @@ public ByteArrayOutputStream getContentBytes(HoodieStorage storage) throws IOExc
}

@Override
public Map<String, HoodieColumnRangeMetadata<Comparable>> getColumnRangeMeta(String filePath) {
public Map<String, HoodieColumnRangeMetadata<Comparable>> getColumnRangeMeta(String filePath, HoodieIndexVersion indexVersion) {
ValidationUtils.checkArgument(parquetMetadata != null, "parquetMetadata should not be null.");
ParquetUtils parquetUtils = new ParquetUtils();
List<HoodieColumnRangeMetadata<Comparable>> columnMetaList = parquetUtils.readColumnStatsFromMetadata(parquetMetadata, filePath, Option.empty());
List<HoodieColumnRangeMetadata<Comparable>> columnMetaList = parquetUtils.readColumnStatsFromMetadata(parquetMetadata, filePath, Option.empty(), indexVersion);
return columnMetaList.stream().collect(Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, colMeta -> colMeta));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi.io.v2;

import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.AppendResult;
Expand All @@ -36,7 +35,9 @@
import org.apache.hudi.io.log.block.HoodieFlinkAvroDataBlock;
import org.apache.hudi.io.log.block.HoodieFlinkParquetDataBlock;
import org.apache.hudi.io.storage.ColumnRangeMetadataProvider;
import org.apache.hudi.metadata.HoodieIndexVersion;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.stats.HoodieColumnRangeMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.util.Lazy;
Expand All @@ -55,6 +56,7 @@
import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION;
import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;

/**
* A write handle that supports creating a log file and writing records based on record Iterator.
Expand Down Expand Up @@ -107,24 +109,22 @@ protected void processAppendResult(AppendResult result, Option<HoodieLogBlock> d

// for parquet data block, we can get column stats from parquet footer directly.
if (config.isMetadataColumnStatsIndexEnabled()) {
HoodieIndexVersion indexVersion = HoodieTableMetadataUtil.existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, hoodieTable.getMetaClient());
Set<String> columnsToIndexSet = new HashSet<>(HoodieTableMetadataUtil
.getColumnsToIndex(hoodieTable.getMetaClient().getTableConfig(),
config.getMetadataConfig(), Lazy.eagerly(Option.of(writeSchemaWithMetaFields)),
Option.of(HoodieRecord.HoodieRecordType.FLINK)).keySet());
Option.of(HoodieRecord.HoodieRecordType.FLINK), indexVersion).keySet());

Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata;
if (dataBlock.isEmpty()) {
// only delete block exists
columnRangeMetadata = new HashMap<>();
for (String col: columnsToIndexSet) {
columnRangeMetadata.put(col, HoodieColumnRangeMetadata.create(
stat.getPath(), col, null, null, 0L, 0L, 0L, 0L));
}
columnsToIndexSet.forEach(col -> columnRangeMetadata.put(col, HoodieColumnRangeMetadata.createEmpty(stat.getPath(), col, indexVersion)));
} else {
ValidationUtils.checkArgument(dataBlock.get() instanceof ColumnRangeMetadataProvider,
"Log block for Flink ingestion should always be an instance of ColumnRangeMetadataProvider for collecting column stats efficiently.");
columnRangeMetadata =
((ColumnRangeMetadataProvider) dataBlock.get()).getColumnRangeMeta(stat.getPath()).entrySet().stream()
((ColumnRangeMetadataProvider) dataBlock.get()).getColumnRangeMeta(stat.getPath(), indexVersion).entrySet().stream()
.filter(e -> columnsToIndexSet.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampNTZType$;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.sql.types.TimestampType$;
import org.apache.spark.sql.types.UserDefinedType;
Expand Down Expand Up @@ -267,10 +268,14 @@ private static DataType constructSparkSchemaFromType(Type type) {
case DATE:
return DateType$.MODULE$;
case TIME:
case TIME_MILLIS:
throw new UnsupportedOperationException(String.format("cannot convert %s type to Spark", type));
case TIMESTAMP:
// todo support TimeStampNTZ
case TIMESTAMP_MILLIS:
return TimestampType$.MODULE$;
case LOCAL_TIMESTAMP_MILLIS:
case LOCAL_TIMESTAMP_MICROS:
return TimestampNTZType$.MODULE$;
case STRING:
return StringType$.MODULE$;
case UUID:
Expand All @@ -280,7 +285,9 @@ private static DataType constructSparkSchemaFromType(Type type) {
case BINARY:
return BinaryType$.MODULE$;
case DECIMAL:
Types.DecimalType decimal = (Types.DecimalType) type;
case DECIMAL_BYTES:
case DECIMAL_FIXED:
Types.DecimalBase decimal = (Types.DecimalBase) type;
return DecimalType$.MODULE$.apply(decimal.precision(), decimal.scale());
default:
throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Spark", type));
Expand Down
Loading
Loading