diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 1cdad6c565607..54964b0c63805 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1453,6 +1453,10 @@ public boolean isMetadataIndexColumnStatsForAllColumnsEnabled() { return isMetadataTableEnabled() && getMetadataConfig().isMetadataColumnStatsIndexForAllColumnsEnabled(); } + public int getColumnStatsIndexParallelism() { + return metadataConfig.getColumnStatsIndexParallelism(); + } + public int getBloomIndexKeysPerBucket() { return getInt(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 7eafe268ba8e8..78c391dba79b3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -18,12 +18,14 @@ package org.apache.hudi.io; +import org.apache.avro.Schema; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieDeltaWriteStat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; @@ -52,6 +54,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieAppendException; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.table.HoodieTable; @@ -71,6 +74,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.COLUMN_RANGE_MERGE_FUNCTION; + /** * IO Operation to append data onto an existing file. */ @@ -320,7 +325,48 @@ private void updateWriteStatus(HoodieDeltaWriteStat stat, AppendResult result) { statuses.add(this.writeStatus); } - private void processAppendResult(AppendResult result) { + /** + * Get column statistics for the records part of this append handle. + * + * @param filePath - Log file that records are part of + * @param recordList - List of records appended to the log for which column statistics is needed for + * @param columnRangeMap - Output map to accumulate the column statistics for the records + */ + private void getRecordsStats(final String filePath, List recordList, + Map> columnRangeMap) { + recordList.forEach(record -> accumulateColumnRanges(record, writeSchemaWithMetaFields, filePath, columnRangeMap, config.isConsistentLogicalTimestampEnabled())); + } + + /** + * Accumulate column range statistics for the requested record. + * + * @param record - Record to get the column range statistics for + * @param schema - Schema for the record + * @param filePath - File that record belongs to + */ + private static void accumulateColumnRanges(IndexedRecord record, Schema schema, String filePath, + Map> columnRangeMap, boolean consistentLogicalTimestampEnabled) { + if (!(record instanceof GenericRecord)) { + throw new HoodieIOException("Record is not a generic type to get column range metadata!"); + } + schema.getFields().forEach(field -> { + final String fieldVal = HoodieAvroUtils.getNestedFieldValAsString((GenericRecord) record, field.name(), true, consistentLogicalTimestampEnabled); + final int fieldSize = fieldVal == null ? 0 : fieldVal.length(); + final HoodieColumnRangeMetadata fieldRange = new HoodieColumnRangeMetadata<>( + filePath, + field.name(), + fieldVal, + fieldVal, + fieldVal == null ? 1 : 0, // null count + fieldVal == null ? 0 : 1, // value count + fieldSize, + fieldSize + ); + columnRangeMap.merge(field.name(), fieldRange, COLUMN_RANGE_MERGE_FUNCTION); + }); + } + + private void processAppendResult(AppendResult result, List recordList) { HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat) this.writeStatus.getStat(); if (stat.getPath() == null) { @@ -339,6 +385,13 @@ private void processAppendResult(AppendResult result) { updateWriteStatus(stat, result); } + if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) { + Map> columnRangeMap = stat.getRecordsStats().isPresent() + ? stat.getRecordsStats().get().getStats() : new HashMap<>(); + getRecordsStats(stat.getPath(), recordList, columnRangeMap); + stat.setRecordsStats(new HoodieDeltaWriteStat.RecordsStats<>(columnRangeMap)); + } + resetWriteCounts(); assert stat.getRuntimeStats() != null; LOG.info(String.format("AppendHandle for partitionPath %s filePath %s, took %d ms.", partitionPath, @@ -376,7 +429,7 @@ protected void appendDataAndDeleteBlocks(Map header) if (blocks.size() > 0) { AppendResult appendResult = writer.appendBlocks(blocks); - processAppendResult(appendResult); + processAppendResult(appendResult, recordList); recordList.clear(); keysToDelete.clear(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java index 12d075e0cb532..dd9fc64ff6bf5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java @@ -19,14 +19,8 @@ package org.apache.hudi.io; import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.bloom.BloomFilterTypeCode; -import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.HoodieTimer; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; @@ -39,8 +33,6 @@ import org.apache.log4j.Logger; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -53,56 +45,23 @@ public class HoodieKeyLookupHandle exten private final BloomFilter bloomFilter; private final List candidateRecordKeys; - private final boolean useMetadataTableIndex; - private Option fileName = Option.empty(); private long totalKeysChecked; public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable hoodieTable, Pair partitionPathFileIDPair) { - this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false); - } - - public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable hoodieTable, - Pair partitionPathFileIDPair, Option fileName, - boolean useMetadataTableIndex) { super(config, hoodieTable, partitionPathFileIDPair); this.candidateRecordKeys = new ArrayList<>(); this.totalKeysChecked = 0; - if (fileName.isPresent()) { - ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()), - "File name '" + fileName.get() + "' doesn't match this lookup handle fileid '" + getFileId() + "'"); - this.fileName = fileName; - } - this.useMetadataTableIndex = useMetadataTableIndex; this.bloomFilter = getBloomFilter(); } private BloomFilter getBloomFilter() { - BloomFilter bloomFilter = null; - HoodieTimer timer = new HoodieTimer().startTimer(); - try { - if (this.useMetadataTableIndex) { - ValidationUtils.checkArgument(this.fileName.isPresent(), - "File name not available to fetch bloom filter from the metadata table index."); - Option bloomFilterByteBuffer = - hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), fileName.get()); - if (!bloomFilterByteBuffer.isPresent()) { - throw new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight()); - } - bloomFilter = - new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(), - BloomFilterTypeCode.DYNAMIC_V0); - } else { - try (HoodieFileReader reader = createNewFileReader()) { - bloomFilter = reader.readBloomFilter(); - } - } + try (HoodieFileReader reader = createNewFileReader()) { + LOG.debug(String.format("Read bloom filter from %s", partitionPathFileIDPair)); + return reader.readBloomFilter(); } catch (IOException e) { - throw new HoodieIndexException(String.format("Error reading bloom filter from %s/%s - %s", - getPartitionPathFileIDPair().getLeft(), this.fileName, e)); + throw new HoodieIndexException(String.format("Error reading bloom filter from %s", getPartitionPathFileIDPair()), e); } - LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFileIDPair, timer.endTimer())); - return bloomFilter; } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index eee676822a8aa..07dbf7212d121 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -56,6 +56,7 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -81,6 +82,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; @@ -121,7 +123,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta * @param hadoopConf - Hadoop configuration to use for the metadata writer * @param writeConfig - Writer config * @param engineContext - Engine context - * @param actionMetadata - Optional action metadata to help decide bootstrap operations + * @param actionMetadata - Optional action metadata to help decide initialize operations * @param - Action metadata types extending Avro generated SpecificRecordBase * @param inflightInstantTimestamp - Timestamp of any instant in progress */ @@ -203,7 +205,7 @@ private void enablePartitions() { * @param metadataConfig - Table config * @param metaClient - Meta client for the metadata table * @param fsView - Metadata table filesystem view to use - * @param isBootstrapCompleted - Is metadata table bootstrap completed + * @param isBootstrapCompleted - Is metadata table initialize completed */ private void enablePartition(final MetadataPartitionType partitionType, final HoodieMetadataConfig metadataConfig, final Option metaClient, Option fsView, boolean isBootstrapCompleted) { @@ -319,13 +321,13 @@ public List getEnabledPartitionTypes() { /** * Initialize the metadata table if it does not exist. - * - * If the metadata table does not exist, then file and partition listing is used to bootstrap the table. + *

+ * If the metadata table does not exist, then file and partition listing is used to initialize the table. * * @param engineContext - * @param actionMetadata Action metadata types extending Avro generated SpecificRecordBase + * @param actionMetadata Action metadata types extending Avro generated SpecificRecordBase * @param inflightInstantTimestamp Timestamp of an instant in progress on the dataset. This instant is ignored - * while deciding to bootstrap the metadata table. + * while deciding to initialize the metadata table. */ protected abstract void initialize(HoodieEngineContext engineContext, Option actionMetadata, @@ -347,64 +349,62 @@ public void initTableMetadata() { /** * Bootstrap the metadata table if needed. * - * @param engineContext - Engine context - * @param dataMetaClient - Meta client for the data table - * @param actionMetadata - Optional action metadata - * @param - Action metadata types extending Avro generated SpecificRecordBase + * @param dataMetaClient - Meta client for the data table + * @param actionMetadata - Optional action metadata + * @param - Action metadata types extending Avro generated SpecificRecordBase * @param inflightInstantTimestamp - Timestamp of an instant in progress on the dataset. This instant is ignored * @throws IOException */ - protected void bootstrapIfNeeded(HoodieEngineContext engineContext, - HoodieTableMetaClient dataMetaClient, - Option actionMetadata, - Option inflightInstantTimestamp) throws IOException { + protected void initializeIfNeeded(HoodieTableMetaClient dataMetaClient, + Option actionMetadata, + Option inflightInstantTimestamp) throws IOException { HoodieTimer timer = new HoodieTimer().startTimer(); boolean exists = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME)); - boolean rebootstrap = false; + boolean reInitialize = false; // If the un-synced instants have been archived, then - // the metadata table will need to be bootstrapped again. + // the metadata table will need to be initialized again. if (exists) { final HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()) .setBasePath(metadataWriteConfig.getBasePath()).build(); final Option latestMetadataInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant(); - rebootstrap = isBootstrapNeeded(latestMetadataInstant, actionMetadata); + reInitialize = isBootstrapNeeded(latestMetadataInstant, actionMetadata); } - if (rebootstrap) { + if (reInitialize) { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.REBOOTSTRAP_STR, 1)); - LOG.info("Deleting Metadata Table directory so that it can be re-bootstrapped"); + LOG.info("Deleting Metadata Table directory so that it can be re-initialized"); dataMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath()), true); exists = false; } if (!exists) { // Initialize for the first time by listing partitions and files directly from the file system - if (bootstrapFromFilesystem(engineContext, dataMetaClient, inflightInstantTimestamp)) { + if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); } } } /** - * Whether bootstrap operation needed for this metadata table. + * Whether initialize operation needed for this metadata table. *

* Rollback of the first commit would look like un-synced instants in the metadata table. - * Action metadata is needed to verify the instant time and avoid erroneous bootstrapping. + * Action metadata is needed to verify the instant time and avoid erroneous initializing. *

* TODO: Revisit this logic and validate that filtering for all * commits timeline is the right thing to do * - * @return True if the bootstrap is not needed, False otherwise + * @return True if the initialize is not needed, False otherwise */ private boolean isBootstrapNeeded(Option latestMetadataInstant, Option actionMetadata) { if (!latestMetadataInstant.isPresent()) { - LOG.warn("Metadata Table will need to be re-bootstrapped as no instants were found"); + LOG.warn("Metadata Table will need to be re-initialized as no instants were found"); return true; } @@ -417,7 +417,7 @@ private boolean isBootstrapNeeded(Option pendingDataInstant = dataMetaClient.getActiveTimeline() .getInstants().filter(i -> !i.isCompleted()) .filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get())) @@ -493,7 +489,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi if (!pendingDataInstant.isEmpty()) { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1)); - LOG.warn("Cannot bootstrap metadata table as operation(s) are in progress on the dataset: " + LOG.warn("Cannot initialize metadata table as operation(s) are in progress on the dataset: " + Arrays.toString(pendingDataInstant.toArray())); return false; } @@ -518,15 +514,10 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi initTableMetadata(); initializeEnabledFileGroups(dataMetaClient, createInstantTime); - // List all partitions in the basePath of the containing dataset - LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); - engineContext.setJobStatus(this.getClass().getSimpleName(), "Bootstrap: initializing metadata table by listing files and partitions"); - List dirInfoList = listAllPartitions(dataMetaClient); - - // During bootstrap, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out of these - // large number of files and calling the existing update(HoodieCommitMetadata) function does not scale well. - // Hence, we have a special commit just for the bootstrap scenario. - bootstrapCommit(dirInfoList, createInstantTime); + // During cold startup, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out + // of these large number of files and calling the existing update(HoodieCommitMetadata) function does not scale + // well. Hence, we have a special commit just for the initialization scenario. + initialCommit(createInstantTime); return true; } @@ -642,6 +633,14 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata } } + private MetadataRecordsGenerationParams getRecordsGenerationParams() { + return new MetadataRecordsGenerationParams( + dataMetaClient, enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), + dataWriteConfig.getBloomIndexParallelism(), + dataWriteConfig.isMetadataIndexColumnStatsForAllColumnsEnabled(), + dataWriteConfig.getColumnStatsIndexParallelism()); + } + /** * Interface to assist in converting commit metadata to List of HoodieRecords to be written to metadata table. * Updates of different commit metadata uses the same method to convert to HoodieRecords and hence. @@ -672,8 +671,8 @@ private void processAndCommit(String instantTime, ConvertMetadataFunction co */ @Override public void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction) { - processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, enabledPartitionTypes, - commitMetadata, dataMetaClient, dataWriteConfig.isMetadataIndexColumnStatsForAllColumnsEnabled(), instantTime), !isTableServiceAction); + processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords( + engineContext, commitMetadata, instantTime, getRecordsGenerationParams()), !isTableServiceAction); } /** @@ -684,8 +683,8 @@ public void update(HoodieCommitMetadata commitMetadata, String instantTime, bool */ @Override public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { - processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, enabledPartitionTypes, - cleanMetadata, dataMetaClient, instantTime), false); + processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, + cleanMetadata, getRecordsGenerationParams(), instantTime), false); } /** @@ -697,7 +696,7 @@ public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { @Override public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, - enabledPartitionTypes, metadataMetaClient.getActiveTimeline(), restoreMetadata, dataMetaClient, instantTime, + metadataMetaClient.getActiveTimeline(), restoreMetadata, getRecordsGenerationParams(), instantTime, metadata.getSyncedInstantTime()), false); } @@ -723,8 +722,8 @@ public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) } Map> records = - HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, enabledPartitionTypes, - metadataMetaClient.getActiveTimeline(), rollbackMetadata, dataMetaClient, instantTime, + HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, metadataMetaClient.getActiveTimeline(), + rollbackMetadata, getRecordsGenerationParams(), instantTime, metadata.getSyncedInstantTime(), wasSynced); commit(instantTime, records, false); } @@ -836,20 +835,29 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan } /** - * This is invoked to bootstrap metadata table for a dataset. Bootstrap Commit has special handling mechanism due to its scale compared to + * This is invoked to initialize metadata table for a dataset. Bootstrap Commit has special handling mechanism due to its scale compared to * other regular commits. - * */ - protected void bootstrapCommit(List partitionInfoList, String createInstantTime) { - List partitions = partitionInfoList.stream().map(p -> - p.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : p.getRelativePath()).collect(Collectors.toList()); - final int totalFiles = partitionInfoList.stream().mapToInt(p -> p.getTotalFiles()).sum(); + private void initialCommit(String createInstantTime) { + // List all partitions in the basePath of the containing dataset + LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); + engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions"); + + List partitionInfoList = listAllPartitions(dataMetaClient); + List partitions = new ArrayList<>(); + AtomicLong totalFiles = new AtomicLong(0); + Map> partitionToFilesMap = partitionInfoList.stream().map(p -> { + final String partitionName = p.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : p.getRelativePath(); + partitions.add(partitionName); + totalFiles.addAndGet(p.getTotalFiles()); + return Pair.of(partitionName, p.getFileNameToSizeMap()); + }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); final Map> partitionToRecordsMap = new HashMap<>(); // Record which saves the list of all partitions HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); if (partitions.isEmpty()) { - // in case of bootstrapping of a fresh table, there won't be any partitions, but we need to make a boostrap commit + // in case of initializing of a fresh table, there won't be any partitions, but we need to make a boostrap commit final HoodieData allPartitionRecordsRDD = engineContext.parallelize( Collections.singletonList(allPartitionRecord), 1); partitionToRecordsMap.put(MetadataPartitionType.FILES, allPartitionRecordsRDD); @@ -857,7 +865,7 @@ protected void bootstrapCommit(List partitionInfoList, String cre return; } - HoodieData partitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1); + HoodieData filesPartitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1); if (!partitionInfoList.isEmpty()) { HoodieData fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> { Map fileNameToSizeMap = partitionInfo.getFileNameToSizeMap(); @@ -871,27 +879,39 @@ protected void bootstrapCommit(List partitionInfoList, String cre return HoodieMetadataPayload.createPartitionFilesRecord( partitionInfo.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : partitionInfo.getRelativePath(), Option.of(validFileNameToSizeMap), Option.empty()); }); - partitionRecords = partitionRecords.union(fileListRecords); + filesPartitionRecords = filesPartitionRecords.union(fileListRecords); + } + ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1)); + partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords); + + if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { + final HoodieData recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( + engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); + partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD); + } + + if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) { + final HoodieData recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( + engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); + partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD); } LOG.info("Committing " + partitions.size() + " partitions and " + totalFiles + " files to metadata"); - ValidationUtils.checkState(partitionRecords.count() == (partitions.size() + 1)); - partitionToRecordsMap.put(MetadataPartitionType.FILES, partitionRecords); commit(createInstantTime, partitionToRecordsMap, false); } /** * A class which represents a directory and the files and directories inside it. - * + *

* A {@code PartitionFileInfo} object saves the name of the partition and various properties requires of each file - * required for bootstrapping the metadata table. Saving limited properties reduces the total memory footprint when - * a very large number of files are present in the dataset being bootstrapped. + * required for initializing the metadata table. Saving limited properties reduces the total memory footprint when + * a very large number of files are present in the dataset being initialized. */ static class DirectoryInfo implements Serializable { // Relative path of the directory (relative to the base directory) private final String relativePath; // Map of filenames within this partition to their respective sizes - private HashMap filenameToSizeMap; + private final HashMap filenameToSizeMap; // List of directories within this partition private final List subDirectories = new ArrayList<>(); // Is this a hoodie partition diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 275ab4f5e0a33..aeb546b0ca5c6 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -91,7 +91,7 @@ protected void initialize(HoodieEngineContext eng Option inflightInstantTimestamp) { try { if (enabled) { - bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp); + initializeIfNeeded(dataMetaClient, actionMetadata, inflightInstantTimestamp); } } catch (IOException e) { LOG.error("Failed to initialize metadata table. Disabling the writer.", e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java index 32bca55099eda..8a2958eab9da8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java @@ -20,8 +20,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.client.utils.LazyIterableIterator; -import org.apache.hudi.common.bloom.BloomFilterTypeCode; -import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; +import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; @@ -37,8 +36,6 @@ import org.apache.spark.api.java.function.Function2; import scala.Tuple2; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -113,7 +110,7 @@ protected List computeNext() { } List> partitionNameFileNameList = new ArrayList<>(fileToKeysMap.keySet()); - Map, ByteBuffer> fileToBloomFilterMap = + Map, BloomFilter> fileToBloomFilterMap = hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList); final AtomicInteger totalKeys = new AtomicInteger(0); @@ -126,11 +123,7 @@ protected List computeNext() { if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) { throw new HoodieIndexException("Failed to get the bloom filter for " + partitionPathFileNamePair); } - final ByteBuffer fileBloomFilterByteBuffer = fileToBloomFilterMap.get(partitionPathFileNamePair); - - HoodieDynamicBoundedBloomFilter fileBloomFilter = - new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(fileBloomFilterByteBuffer).toString(), - BloomFilterTypeCode.DYNAMIC_V0); + final BloomFilter fileBloomFilter = fileToBloomFilterMap.get(partitionPathFileNamePair); List candidateRecordKeys = new ArrayList<>(); hoodieKeyList.forEach(hoodieKey -> { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 1a32ae5e9aa26..d0e0c9532f0d4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -113,7 +113,7 @@ protected void initialize(HoodieEngineContext eng }); if (enabled) { - bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp); + initializeIfNeeded(dataMetaClient, actionMetadata, inflightInstantTimestamp); } } catch (IOException e) { LOG.error("Failed to initialize metadata table. Disabling the writer.", e); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 6ce6314bbad2f..2a535d36fbfdf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -165,6 +165,12 @@ public final class HoodieMetadataConfig extends HoodieConfig { + "used for pruning files during the index lookups. Only applies if " + ENABLE_METADATA_INDEX_COLUMN_STATS.key() + " is enabled.A"); + public static final ConfigProperty COLUMN_STATS_INDEX_PARALLELISM = ConfigProperty + .key(METADATA_PREFIX + ".index.column.stats.parallelism") + .defaultValue(1) + .sinceVersion("0.11.0") + .withDocumentation("Parallelism to use, when generating column stats index."); + public static final ConfigProperty POPULATE_META_FIELDS = ConfigProperty .key(METADATA_PREFIX + ".populate.meta.fields") .defaultValue(false) @@ -218,6 +224,10 @@ public int getColumnStatsIndexFileGroupCount() { return getIntOrDefault(METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT); } + public int getColumnStatsIndexParallelism() { + return getIntOrDefault(COLUMN_STATS_INDEX_PARALLELISM); + } + public boolean enableMetrics() { return getBoolean(METRICS_ENABLE); } @@ -280,6 +290,11 @@ public Builder withMetadataIndexColumnStatsFileGroupCount(int fileGroupCount) { return this; } + public Builder withColumnStatsIndexParallelism(int parallelism) { + metadataConfig.setValue(COLUMN_STATS_INDEX_PARALLELISM, String.valueOf(parallelism)); + return this; + } + public Builder withMetadataIndexForAllColumns(boolean enable) { metadataConfig.setValue(ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS, String.valueOf(enable)); return this; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java index f25d76813357e..1ffe199229828 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java @@ -18,12 +18,16 @@ package org.apache.hudi.common.model; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Comparator; import java.util.Objects; +import java.util.function.BiFunction; /** * Hoodie Range metadata. */ -public class HoodieColumnRangeMetadata { +public class HoodieColumnRangeMetadata implements Serializable { private final String filePath; private final String columnName; private final T minValue; @@ -33,6 +37,20 @@ public class HoodieColumnRangeMetadata { private final long totalSize; private final long totalUncompressedSize; + public static final BiFunction, HoodieColumnRangeMetadata, HoodieColumnRangeMetadata> COLUMN_RANGE_MERGE_FUNCTION = + (oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<>( + newColumnRange.getFilePath(), + newColumnRange.getColumnName(), + (Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue()) + .stream().filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null), + (Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue()) + .stream().filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null), + oldColumnRange.getNullCount() + newColumnRange.getNullCount(), + oldColumnRange.getValueCount() + newColumnRange.getValueCount(), + oldColumnRange.getTotalSize() + newColumnRange.getTotalSize(), + oldColumnRange.getTotalUncompressedSize() + newColumnRange.getTotalUncompressedSize() + ); + public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, final long nullCount, long valueCount, long totalSize, long totalUncompressedSize) { this.filePath = filePath; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java index c97743f4d115e..cf3bb52263366 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java @@ -19,9 +19,12 @@ package org.apache.hudi.common.model; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.hudi.common.util.Option; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * Statistics about a single Hoodie delta log operation. @@ -33,6 +36,7 @@ public class HoodieDeltaWriteStat extends HoodieWriteStat { private long logOffset; private String baseFile; private List logFiles = new ArrayList<>(); + private Option> recordsStats = Option.empty(); public void setLogVersion(int logVersion) { this.logVersion = logVersion; @@ -69,4 +73,24 @@ public void addLogFiles(String logFile) { public List getLogFiles() { return logFiles; } + + public void setRecordsStats(RecordsStats stats) { + recordsStats = Option.of(stats); + } + + public Option> getRecordsStats() { + return recordsStats; + } + + public static class RecordsStats implements Serializable { + private final T recordsStats; + + public RecordsStats(T recordsStats) { + this.recordsStats = recordsStats; + } + + public T getStats() { + return recordsStats; + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 459397bdcb8ef..0eab97abd9776 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -21,6 +21,8 @@ import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.avro.model.HoodieMetadataBloomFilter; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -46,6 +48,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -144,9 +147,8 @@ public Map getAllFilesInPartitions(List partitions throws IOException { if (isMetadataTableEnabled) { try { - List partitionPaths = partitions.stream().map(entry -> new Path(entry)).collect(Collectors.toList()); - Map partitionsFilesMap = fetchAllFilesInPartitionPaths(partitionPaths); - return partitionsFilesMap; + List partitionPaths = partitions.stream().map(Path::new).collect(Collectors.toList()); + return fetchAllFilesInPartitionPaths(partitionPaths); } catch (Exception e) { throw new HoodieMetadataException("Failed to retrieve files in partition from metadata", e); } @@ -157,7 +159,7 @@ public Map getAllFilesInPartitions(List partitions } @Override - public Option getBloomFilter(final String partitionName, final String fileName) + public Option getBloomFilter(final String partitionName, final String fileName) throws HoodieMetadataException { if (!isBloomFilterIndexEnabled) { LOG.error("Metadata bloom filter index is disabled!"); @@ -165,7 +167,7 @@ public Option getBloomFilter(final String partitionName, final Strin } final Pair partitionFileName = Pair.of(partitionName, fileName); - Map, ByteBuffer> bloomFilters = getBloomFilters(Collections.singletonList(partitionFileName)); + Map, BloomFilter> bloomFilters = getBloomFilters(Collections.singletonList(partitionFileName)); if (bloomFilters.isEmpty()) { LOG.error("Meta index: missing bloom filter for partition: " + partitionName + ", file: " + fileName); return Option.empty(); @@ -176,7 +178,7 @@ public Option getBloomFilter(final String partitionName, final Strin } @Override - public Map, ByteBuffer> getBloomFilters(final List> partitionNameFileNameList) + public Map, BloomFilter> getBloomFilters(final List> partitionNameFileNameList) throws HoodieMetadataException { if (!isBloomFilterIndexEnabled) { LOG.error("Metadata bloom filter index is disabled!"); @@ -203,7 +205,7 @@ public Map, ByteBuffer> getBloomFilters(final List m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, (timer.endTimer() / partitionIDFileIDStrings.size()))); - Map, ByteBuffer> partitionFileToBloomFilterMap = new HashMap<>(); + Map, BloomFilter> partitionFileToBloomFilterMap = new HashMap<>(); for (final Pair>> entry : hoodieRecordList) { if (entry.getRight().isPresent()) { final Option bloomFilterMetadata = @@ -211,7 +213,11 @@ public Map, ByteBuffer> getBloomFilters(final List, HoodieMetadataColumnStats> getColumnStats(final /** * Returns a list of all partitions. */ - protected List fetchAllPartitionPaths() throws IOException { + protected List fetchAllPartitionPaths() { HoodieTimer timer = new HoodieTimer().startTimer(); Option> hoodieRecord = getRecordByKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath()); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index a4e5ea3539f17..1bb18bad16e40 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -19,6 +19,7 @@ package org.apache.hudi.metadata; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; +import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; @@ -33,7 +34,6 @@ import org.apache.hudi.exception.HoodieMetadataException; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -143,13 +143,13 @@ public void reset() { // no-op } - public Option getBloomFilter(final String partitionName, final String fileName) + public Option getBloomFilter(final String partitionName, final String fileName) throws HoodieMetadataException { throw new HoodieMetadataException("Unsupported operation: getBloomFilter for " + fileName); } @Override - public Map, ByteBuffer> getBloomFilters(final List> partitionNameFileNameList) + public Map, BloomFilter> getBloomFilters(final List> partitionNameFileNameList) throws HoodieMetadataException { throw new HoodieMetadataException("Unsupported operation: getBloomFilters!"); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 6000e04f3c90a..fca80fdabf2d9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -22,7 +22,6 @@ import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.avro.model.HoodieMetadataFileInfo; import org.apache.hudi.avro.model.HoodieMetadataRecord; -import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; @@ -49,9 +48,12 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -111,7 +113,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload COLUMN_STATS_MERGE_FUNCTION = + (oldColumnStats, newColumnStats) -> { + ValidationUtils.checkArgument(oldColumnStats.getFileName().equals(newColumnStats.getFileName())); + if (newColumnStats.getIsDeleted()) { + return newColumnStats; + } + return new HoodieMetadataColumnStats( + newColumnStats.getFileName(), + Stream.of(oldColumnStats.getMinValue(), newColumnStats.getMinValue()).filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null), + Stream.of(oldColumnStats.getMinValue(), newColumnStats.getMinValue()).filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null), + oldColumnStats.getNullCount() + newColumnStats.getNullCount(), + oldColumnStats.getValueCount() + newColumnStats.getValueCount(), + oldColumnStats.getTotalSize() + newColumnStats.getTotalSize(), + oldColumnStats.getTotalUncompressedSize() + newColumnStats.getTotalUncompressedSize(), + newColumnStats.getIsDeleted() + ); + }; + public HoodieMetadataPayload(GenericRecord record, Comparable orderingVal) { this(Option.of(record)); } @@ -135,7 +155,7 @@ public HoodieMetadataPayload(Option record) { filesystemMetadata = (Map) record.get().get("filesystemMetadata"); filesystemMetadata.keySet().forEach(k -> { GenericRecord v = filesystemMetadata.get(k); - filesystemMetadata.put(k.toString(), new HoodieMetadataFileInfo((Long) v.get("size"), (Boolean) v.get("isDeleted"))); + filesystemMetadata.put(k, new HoodieMetadataFileInfo((Long) v.get("size"), (Boolean) v.get("isDeleted"))); }); } @@ -158,7 +178,7 @@ public HoodieMetadataPayload(Option record) { throw new HoodieMetadataException("Valid " + SCHEMA_FIELD_ID_COLUMN_STATS + " record expected for type: " + METADATA_TYPE_COLUMN_STATS); } columnStatMetadata = new HoodieMetadataColumnStats( - (String) v.get(COLUMN_STATS_FIELD_RESOURCE_NAME), + (String) v.get(COLUMN_STATS_FIELD_FILE_NAME), (String) v.get(COLUMN_STATS_FIELD_MIN_VALUE), (String) v.get(COLUMN_STATS_FIELD_MAX_VALUE), (Long) v.get(COLUMN_STATS_FIELD_NULL_COUNT), @@ -243,6 +263,7 @@ public static HoodieRecord createPartitionFilesRecord(Str public static HoodieRecord createBloomFilterMetadataRecord(final String partitionName, final String baseFileName, final String timestamp, + final String bloomFilterType, final ByteBuffer bloomFilter, final boolean isDeleted) { ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR) @@ -252,10 +273,8 @@ public static HoodieRecord createBloomFilterMetadataRecor .concat(new FileIndexID(baseFileName).asBase64EncodedString()); HoodieKey key = new HoodieKey(bloomFilterIndexKey, MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()); - // TODO: HUDI-3203 Get the bloom filter type from the file HoodieMetadataBloomFilter metadataBloomFilter = - new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(), - timestamp, bloomFilter, isDeleted); + new HoodieMetadataBloomFilter(bloomFilterType, timestamp, bloomFilter, isDeleted); HoodieMetadataPayload metadataPayload = new HoodieMetadataPayload(key.getRecordKey(), HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter); return new HoodieAvroRecord<>(key, metadataPayload); @@ -275,18 +294,23 @@ public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) { HoodieMetadataBloomFilter combineBloomFilterMetadata = combineBloomFilterMetadata(previousRecord); return new HoodieMetadataPayload(key, type, combineBloomFilterMetadata); case METADATA_TYPE_COLUMN_STATS: - return new HoodieMetadataPayload(key, type, combineColumnStatsMetadatat(previousRecord)); + return new HoodieMetadataPayload(key, type, combineColumnStatsMetadata(previousRecord)); default: throw new HoodieMetadataException("Unknown type of HoodieMetadataPayload: " + type); } } private HoodieMetadataBloomFilter combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) { + // Bloom filters are always additive. No need to merge with previous bloom filter return this.bloomFilterMetadata; } - private HoodieMetadataColumnStats combineColumnStatsMetadatat(HoodieMetadataPayload previousRecord) { - return this.columnStatMetadata; + private HoodieMetadataColumnStats combineColumnStatsMetadata(HoodieMetadataPayload previousRecord) { + ValidationUtils.checkArgument(previousRecord.getColumnStatMetadata().isPresent()); + ValidationUtils.checkArgument(getColumnStatMetadata().isPresent()); + ValidationUtils.checkArgument(previousRecord.getColumnStatMetadata().get() + .getFileName().equals(this.columnStatMetadata.getFileName())); + return COLUMN_STATS_MERGE_FUNCTION.apply(previousRecord.getColumnStatMetadata().get(), this.columnStatMetadata); } @Override @@ -321,7 +345,7 @@ public Option getInsertValue(Schema schema) throws IOException { * Returns the list of filenames added as part of this record. */ public List getFilenames() { - return filterFileInfoEntries(false).map(e -> e.getKey()).sorted().collect(Collectors.toList()); + return filterFileInfoEntries(false).map(Map.Entry::getKey).sorted().collect(Collectors.toList()); } /** @@ -390,9 +414,7 @@ private Map combineFilesystemMetadata(HoodieMeta combinedFileInfo.remove(filename); } else { // file appends. - combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> { - return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false); - }); + combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false)); } } }); @@ -461,8 +483,6 @@ public static Stream createColumnStatsRecords( .build()); return new HoodieAvroRecord<>(key, payload); }); - - } @Override @@ -475,9 +495,9 @@ public String toString() { if (type == METADATA_TYPE_BLOOM_FILTER) { ValidationUtils.checkState(getBloomFilterMetadata().isPresent()); sb.append("BloomFilter: {"); - sb.append("bloom size: " + getBloomFilterMetadata().get().getBloomFilter().array().length).append(", "); - sb.append("timestamp: " + getBloomFilterMetadata().get().getTimestamp()).append(", "); - sb.append("deleted: " + getBloomFilterMetadata().get().getIsDeleted()); + sb.append("bloom size: ").append(getBloomFilterMetadata().get().getBloomFilter().array().length).append(", "); + sb.append("timestamp: ").append(getBloomFilterMetadata().get().getTimestamp()).append(", "); + sb.append("deleted: ").append(getBloomFilterMetadata().get().getIsDeleted()); sb.append("}"); } if (type == METADATA_TYPE_COLUMN_STATS) { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index 6a5df050a0b29..da97e743870ba 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -19,6 +19,7 @@ package org.apache.hudi.metadata; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; +import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -31,7 +32,6 @@ import java.io.IOException; import java.io.Serializable; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -113,20 +113,20 @@ static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetad * * @param partitionName - Partition name * @param fileName - File name for which bloom filter needs to be retrieved - * @return BloomFilter byte buffer if available, otherwise empty + * @return BloomFilter if available, otherwise empty * @throws HoodieMetadataException */ - Option getBloomFilter(final String partitionName, final String fileName) + Option getBloomFilter(final String partitionName, final String fileName) throws HoodieMetadataException; /** * Get bloom filters for files from the metadata table index. * * @param partitionNameFileNameList - List of partition and file name pair for which bloom filters need to be retrieved - * @return Map of partition file name pair to its bloom filter byte buffer + * @return Map of partition file name pair to its bloom filter * @throws HoodieMetadataException */ - Map, ByteBuffer> getBloomFilters(final List> partitionNameFileNameList) + Map, BloomFilter> getBloomFilters(final List> partitionNameFileNameList) throws HoodieMetadataException; /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 5fd59bc932872..70fe2e76eb4dd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -18,9 +18,6 @@ package org.apache.hudi.metadata; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -45,20 +42,25 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -102,37 +104,27 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont /** * Convert commit action to metadata records for the enabled partition types. * - * @param commitMetadata - Commit action metadata - * @param dataMetaClient - Meta client for the data table - * @param isMetaIndexColumnStatsForAllColumns - Do all columns need meta indexing? - * @param instantTime - Action instant time + * @param commitMetadata - Commit action metadata + * @param instantTime - Action instant time + * @param recordsGenerationParams - Parameters for the record generation * @return Map of partition to metadata records for the commit action */ public static Map> convertMetadataToRecords( - HoodieEngineContext context, List enabledPartitionTypes, - HoodieCommitMetadata commitMetadata, HoodieTableMetaClient dataMetaClient, - boolean isMetaIndexColumnStatsForAllColumns, String instantTime) { + HoodieEngineContext context, HoodieCommitMetadata commitMetadata, String instantTime, + MetadataRecordsGenerationParams recordsGenerationParams) { final Map> partitionToRecordsMap = new HashMap<>(); final HoodieData filesPartitionRecordsRDD = context.parallelize( convertMetadataToFilesPartitionRecords(commitMetadata, instantTime), 1); partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD); - if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { - final List metadataBloomFilterRecords = convertMetadataToBloomFilterRecords(commitMetadata, - dataMetaClient, instantTime); - if (!metadataBloomFilterRecords.isEmpty()) { - final HoodieData metadataBloomFilterRecordsRDD = context.parallelize(metadataBloomFilterRecords, 1); - partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) { + final HoodieData metadataBloomFilterRecords = convertMetadataToBloomFilterRecords(context, commitMetadata, instantTime, recordsGenerationParams); + partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecords); } - if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) { - final List metadataColumnStats = convertMetadataToColumnStatsRecords(commitMetadata, context, - dataMetaClient, isMetaIndexColumnStatsForAllColumns, instantTime); - if (!metadataColumnStats.isEmpty()) { - final HoodieData metadataColumnStatsRDD = context.parallelize(metadataColumnStats, 1); - partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) { + final HoodieData metadataColumnStatsRDD = convertMetadataToColumnStatsRecords(commitMetadata, context, recordsGenerationParams); + partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); } return partitionToRecordsMap; } @@ -186,94 +178,90 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo /** * Convert commit action metadata to bloom filter records. * - * @param commitMetadata - Commit action metadata - * @param dataMetaClient - Meta client for the data table - * @param instantTime - Action instant time - * @return List of metadata table records + * @param context - Engine context to use + * @param commitMetadata - Commit action metadata + * @param instantTime - Action instant time + * @param recordsGenerationParams - Parameters for bloom filter record generation + * @return HoodieData of metadata table records */ - public static List convertMetadataToBloomFilterRecords(HoodieCommitMetadata commitMetadata, - HoodieTableMetaClient dataMetaClient, - String instantTime) { - List records = new LinkedList<>(); - commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> { - final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName; - Map newFiles = new HashMap<>(writeStats.size()); - writeStats.forEach(hoodieWriteStat -> { - // No action for delta logs - if (hoodieWriteStat instanceof HoodieDeltaWriteStat) { - return; - } + public static HoodieData convertMetadataToBloomFilterRecords( + HoodieEngineContext context, HoodieCommitMetadata commitMetadata, + String instantTime, MetadataRecordsGenerationParams recordsGenerationParams) { + final List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(entry -> entry.stream()).collect(Collectors.toList()); + if (allWriteStats.isEmpty()) { + return context.emptyHoodieData(); + } - String pathWithPartition = hoodieWriteStat.getPath(); - if (pathWithPartition == null) { - // Empty partition - LOG.error("Failed to find path in write stat to update metadata table " + hoodieWriteStat); - return; - } - int offset = partition.equals(NON_PARTITIONED_NAME) ? (pathWithPartition.startsWith("/") ? 1 : 0) : - partition.length() + 1; + HoodieData allWriteStatsRDD = context.parallelize(allWriteStats, + Math.max(recordsGenerationParams.getBloomIndexParallelism(), allWriteStats.size())); + return allWriteStatsRDD.flatMap(hoodieWriteStat -> { + final String partition = hoodieWriteStat.getPartitionPath(); - final String fileName = pathWithPartition.substring(offset); - if (!FSUtils.isBaseFile(new Path(fileName))) { - return; - } - ValidationUtils.checkState(!newFiles.containsKey(fileName), "Duplicate files in HoodieCommitMetadata"); + // For bloom filter index, delta writes do not change the base file bloom filter entries + if (hoodieWriteStat instanceof HoodieDeltaWriteStat) { + return Collections.emptyListIterator(); + } + + String pathWithPartition = hoodieWriteStat.getPath(); + if (pathWithPartition == null) { + // Empty partition + LOG.error("Failed to find path in write stat to update metadata table " + hoodieWriteStat); + return Collections.emptyListIterator(); + } + int offset = partition.equals(NON_PARTITIONED_NAME) ? (pathWithPartition.startsWith("/") ? 1 : 0) : + partition.length() + 1; - final Path writeFilePath = new Path(dataMetaClient.getBasePath(), pathWithPartition); + final String fileName = pathWithPartition.substring(offset); + if (!FSUtils.isBaseFile(new Path(fileName))) { + return Collections.emptyListIterator(); + } + + final Path writeFilePath = new Path(recordsGenerationParams.getDataMetaClient().getBasePath(), pathWithPartition); + try (HoodieFileReader fileReader = + HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(), writeFilePath)) { try { - HoodieFileReader fileReader = - HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), writeFilePath); - try { - final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); - if (fileBloomFilter == null) { - LOG.error("Failed to read bloom filter for " + writeFilePath); - return; - } - ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes()); - HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord( - partition, fileName, instantTime, bloomByteBuffer, false); - records.add(record); - } catch (Exception e) { + final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); + if (fileBloomFilter == null) { LOG.error("Failed to read bloom filter for " + writeFilePath); - return; + return Collections.emptyListIterator(); } + ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes()); + HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord( + partition, fileName, instantTime, recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false); + return Collections.singletonList(record).iterator(); + } catch (Exception e) { + LOG.error("Failed to read bloom filter for " + writeFilePath); + return Collections.emptyListIterator(); + } finally { fileReader.close(); - } catch (IOException e) { - LOG.error("Failed to get bloom filter for file: " + writeFilePath + ", write stat: " + hoodieWriteStat); } - }); + } catch (IOException e) { + LOG.error("Failed to get bloom filter for file: " + writeFilePath + ", write stat: " + hoodieWriteStat); + } + return Collections.emptyListIterator(); }); - - return records; } /** * Convert the clean action to metadata records. */ public static Map> convertMetadataToRecords( - HoodieEngineContext engineContext, List enabledPartitionTypes, - HoodieCleanMetadata cleanMetadata, HoodieTableMetaClient dataMetaClient, String instantTime) { + HoodieEngineContext engineContext, HoodieCleanMetadata cleanMetadata, + MetadataRecordsGenerationParams recordsGenerationParams, String instantTime) { final Map> partitionToRecordsMap = new HashMap<>(); final HoodieData filesPartitionRecordsRDD = engineContext.parallelize( convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1); partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD); - if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { - final List metadataBloomFilterRecords = convertMetadataToBloomFilterRecords(cleanMetadata, - engineContext, instantTime); - if (!metadataBloomFilterRecords.isEmpty()) { - final HoodieData metadataBloomFilterRecordsRDD = engineContext.parallelize(metadataBloomFilterRecords, 1); - partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) { + final HoodieData metadataBloomFilterRecordsRDD = convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, recordsGenerationParams); + partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); } - if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) { - final List metadataColumnStats = convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, - dataMetaClient); - if (!metadataColumnStats.isEmpty()) { - final HoodieData metadataColumnStatsRDD = engineContext.parallelize(metadataColumnStats, 1); - partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) { + final HoodieData metadataColumnStatsRDD = convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, recordsGenerationParams); + partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); } return partitionToRecordsMap; @@ -309,14 +297,16 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCl /** * Convert clean metadata to bloom filter index records. * - * @param cleanMetadata - Clean action metadata - * @param engineContext - Engine context - * @param instantTime - Clean action instant time + * @param cleanMetadata - Clean action metadata + * @param engineContext - Engine context + * @param instantTime - Clean action instant time + * @param recordsGenerationParams - Parameters for bloom filter record generation * @return List of bloom filter index records for the clean metadata */ - public static List convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata, - HoodieEngineContext engineContext, - String instantTime) { + public static HoodieData convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata, + HoodieEngineContext engineContext, + String instantTime, + MetadataRecordsGenerationParams recordsGenerationParams) { List> deleteFileList = new ArrayList<>(); cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { // Files deleted from a partition @@ -329,23 +319,24 @@ public static List convertMetadataToBloomFilterRecords(HoodieClean }); }); - return engineContext.map(deleteFileList, deleteFileInfo -> { - return HoodieMetadataPayload.createBloomFilterMetadataRecord( - deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime, ByteBuffer.allocate(0), true); - }, 1).stream().collect(Collectors.toList()); + HoodieData> deleteFileListRDD = engineContext.parallelize(deleteFileList, + Math.max(deleteFileList.size(), recordsGenerationParams.getBloomIndexParallelism())); + return deleteFileListRDD.map(deleteFileInfo -> HoodieMetadataPayload.createBloomFilterMetadataRecord( + deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime, StringUtils.EMPTY_STRING, + ByteBuffer.allocate(0), true)); } /** * Convert clean metadata to column stats index records. * - * @param cleanMetadata - Clean action metadata - * @param engineContext - Engine context - * @param datasetMetaClient - data table meta client + * @param cleanMetadata - Clean action metadata + * @param engineContext - Engine context + * @param recordsGenerationParams - Parameters for bloom filter record generation * @return List of column stats index records for the clean metadata */ - public static List convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata, - HoodieEngineContext engineContext, - HoodieTableMetaClient datasetMetaClient) { + public static HoodieData convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata, + HoodieEngineContext engineContext, + MetadataRecordsGenerationParams recordsGenerationParams) { List> deleteFileList = new ArrayList<>(); cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { // Files deleted from a partition @@ -353,54 +344,42 @@ public static List convertMetadataToColumnStatsRecords(HoodieClean deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, entry))); }); - List latestColumns = getLatestColumns(datasetMetaClient); - return engineContext.flatMap(deleteFileList, - deleteFileInfo -> { - if (deleteFileInfo.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - return getColumnStats(deleteFileInfo.getKey(), deleteFileInfo.getValue(), datasetMetaClient, - latestColumns, true); - } - return Stream.empty(); - }, 1).stream().collect(Collectors.toList()); + final List columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled()); + HoodieData> deleteFileListRDD = engineContext.parallelize(deleteFileList, + Math.max(deleteFileList.size(), recordsGenerationParams.getColumnStatsIndexParallelism())); + return deleteFileListRDD.flatMap(deleteFileInfo -> { + if (deleteFileInfo.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + return getColumnStats(deleteFileInfo.getKey(), deleteFileInfo.getValue(), recordsGenerationParams.getDataMetaClient(), columnsToIndex, true).iterator(); + } + return Collections.emptyListIterator(); + }); } /** * Convert restore action metadata to metadata table records. */ public static Map> convertMetadataToRecords( - HoodieEngineContext engineContext, List enabledPartitionTypes, - HoodieActiveTimeline metadataTableTimeline, HoodieRestoreMetadata restoreMetadata, - HoodieTableMetaClient dataMetaClient, String instantTime, Option lastSyncTs) { + HoodieEngineContext engineContext, HoodieActiveTimeline metadataTableTimeline, HoodieRestoreMetadata restoreMetadata, + MetadataRecordsGenerationParams recordsGenerationParams, String instantTime, Option lastSyncTs) { final Map> partitionToRecordsMap = new HashMap<>(); final Map> partitionToAppendedFiles = new HashMap<>(); final Map> partitionToDeletedFiles = new HashMap<>(); - processRestoreMetadata(metadataTableTimeline, restoreMetadata, - partitionToAppendedFiles, partitionToDeletedFiles, lastSyncTs); - - final HoodieData filesPartitionRecordsRDD = engineContext.parallelize( - convertFilesToFilesPartitionRecords(partitionToDeletedFiles, - partitionToAppendedFiles, instantTime, "Restore"), 1); + processRestoreMetadata(metadataTableTimeline, restoreMetadata, partitionToAppendedFiles, partitionToDeletedFiles, lastSyncTs); + final HoodieData filesPartitionRecordsRDD = + engineContext.parallelize(convertFilesToFilesPartitionRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore"), 1); partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD); - if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { - final List metadataBloomFilterRecords = convertFilesToBloomFilterRecords( - engineContext, dataMetaClient, partitionToDeletedFiles, partitionToAppendedFiles, instantTime); - if (!metadataBloomFilterRecords.isEmpty()) { - final HoodieData metadataBloomFilterRecordsRDD = engineContext.parallelize(metadataBloomFilterRecords, 1); - partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) { + final HoodieData metadataBloomFilterRecordsRDD = + convertFilesToBloomFilterRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams, instantTime); + partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); } - if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) { - final List metadataColumnStats = convertFilesToColumnStatsRecords( - engineContext, dataMetaClient, partitionToDeletedFiles, partitionToAppendedFiles, instantTime); - if (!metadataColumnStats.isEmpty()) { - final HoodieData metadataColumnStatsRDD = engineContext.parallelize(metadataColumnStats, 1); - partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) { + final HoodieData metadataColumnStatsRDD = convertFilesToColumnStatsRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams); + partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); } - return partitionToRecordsMap; } @@ -416,44 +395,35 @@ private static void processRestoreMetadata(HoodieActiveTimeline metadataTableTim Map> partitionToAppendedFiles, Map> partitionToDeletedFiles, Option lastSyncTs) { - restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { - rms.forEach(rm -> processRollbackMetadata(metadataTableTimeline, rm, - partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs)); - }); + restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> rms.forEach(rm -> processRollbackMetadata(metadataTableTimeline, rm, + partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs))); } /** * Convert rollback action metadata to metadata table records. */ public static Map> convertMetadataToRecords( - HoodieEngineContext engineContext, List enabledPartitionTypes, - HoodieActiveTimeline metadataTableTimeline, HoodieRollbackMetadata rollbackMetadata, - HoodieTableMetaClient dataMetaClient, String instantTime, Option lastSyncTs, boolean wasSynced) { + HoodieEngineContext engineContext, HoodieActiveTimeline metadataTableTimeline, + HoodieRollbackMetadata rollbackMetadata, MetadataRecordsGenerationParams recordsGenerationParams, + String instantTime, Option lastSyncTs, boolean wasSynced) { final Map> partitionToRecordsMap = new HashMap<>(); - Map> partitionToDeletedFiles = new HashMap<>(); Map> partitionToAppendedFiles = new HashMap<>(); - List filesPartitionRecords = convertMetadataToRollbackRecords(metadataTableTimeline, rollbackMetadata, - partitionToDeletedFiles, partitionToAppendedFiles, instantTime, lastSyncTs, wasSynced); + + List filesPartitionRecords = + convertMetadataToRollbackRecords(metadataTableTimeline, rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, instantTime, lastSyncTs, wasSynced); final HoodieData rollbackRecordsRDD = engineContext.parallelize(filesPartitionRecords, 1); partitionToRecordsMap.put(MetadataPartitionType.FILES, rollbackRecordsRDD); - if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { - final List metadataBloomFilterRecords = convertFilesToBloomFilterRecords( - engineContext, dataMetaClient, partitionToDeletedFiles, partitionToAppendedFiles, instantTime); - if (!metadataBloomFilterRecords.isEmpty()) { - final HoodieData metadataBloomFilterRecordsRDD = engineContext.parallelize(metadataBloomFilterRecords, 1); - partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) { + final HoodieData metadataBloomFilterRecordsRDD = + convertFilesToBloomFilterRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams, instantTime); + partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); } - if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) { - final List metadataColumnStats = convertFilesToColumnStatsRecords( - engineContext, dataMetaClient, partitionToDeletedFiles, partitionToAppendedFiles, instantTime); - if (!metadataColumnStats.isEmpty()) { - final HoodieData metadataColumnStatsRDD = engineContext.parallelize(metadataColumnStats, 1); - partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) { + final HoodieData metadataColumnStatsRDD = convertFilesToColumnStatsRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams); + partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); } return partitionToRecordsMap; @@ -608,82 +578,124 @@ private static List convertFilesToFilesPartitionRecords(Map convertFilesToBloomFilterRecords(HoodieEngineContext engineContext, - HoodieTableMetaClient dataMetaClient, - Map> partitionToDeletedFiles, - Map> partitionToAppendedFiles, - String instantTime) { - List records = new LinkedList<>(); - partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> { - if (!FSUtils.isBaseFile(new Path(deletedFile))) { - return; - } + public static HoodieData convertFilesToBloomFilterRecords(HoodieEngineContext engineContext, + Map> partitionToDeletedFiles, + Map> partitionToAppendedFiles, + MetadataRecordsGenerationParams recordsGenerationParams, + String instantTime) { + HoodieData allRecordsRDD = engineContext.emptyHoodieData(); + + List>> partitionToDeletedFilesList = partitionToDeletedFiles.entrySet() + .stream().map(e -> Pair.of(e.getKey(), e.getValue())).collect(Collectors.toList()); + HoodieData>> partitionToDeletedFilesRDD = engineContext.parallelize(partitionToDeletedFilesList, + Math.max(partitionToDeletedFilesList.size(), recordsGenerationParams.getBloomIndexParallelism())); + + HoodieData deletedFilesRecordsRDD = partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> { + final String partitionName = partitionToDeletedFilesEntry.getLeft(); + final List deletedFileList = partitionToDeletedFilesEntry.getRight(); + return deletedFileList.stream().flatMap(deletedFile -> { + if (!FSUtils.isBaseFile(new Path(deletedFile))) { + return Stream.empty(); + } - final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; - records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord( - partition, deletedFile, instantTime, ByteBuffer.allocate(0), true)); - })); + final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; + return Stream.of(HoodieMetadataPayload.createBloomFilterMetadataRecord( + partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, ByteBuffer.allocate(0), true)); + }).iterator(); + }); + allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD); - partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> { + List>> partitionToAppendedFilesList = partitionToAppendedFiles.entrySet() + .stream().map(entry -> Pair.of(entry.getKey(), entry.getValue())).collect(Collectors.toList()); + HoodieData>> partitionToAppendedFilesRDD = engineContext.parallelize(partitionToAppendedFilesList, + Math.max(partitionToAppendedFiles.size(), recordsGenerationParams.getBloomIndexParallelism())); + + HoodieData appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesEntry -> { + final String partitionName = partitionToAppendedFilesEntry.getKey(); + final Map appendedFileMap = partitionToAppendedFilesEntry.getValue(); final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; - appendedFileMap.forEach((appendedFile, length) -> { + return appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> { + final String appendedFile = appendedFileLengthPairEntry.getKey(); if (!FSUtils.isBaseFile(new Path(appendedFile))) { - return; + return Stream.empty(); } final String pathWithPartition = partitionName + "/" + appendedFile; - final Path appendedFilePath = new Path(dataMetaClient.getBasePath(), pathWithPartition); - try { - HoodieFileReader fileReader = - HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), appendedFilePath); + final Path appendedFilePath = new Path(recordsGenerationParams.getDataMetaClient().getBasePath(), pathWithPartition); + try (HoodieFileReader fileReader = + HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(), appendedFilePath)) { final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); if (fileBloomFilter == null) { LOG.error("Failed to read bloom filter for " + appendedFilePath); - return; + return Stream.empty(); } ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes()); HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord( - partition, appendedFile, instantTime, bloomByteBuffer, false); - records.add(record); - fileReader.close(); + partition, appendedFile, instantTime, recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false); + return Stream.of(record); } catch (IOException e) { LOG.error("Failed to get bloom filter for file: " + appendedFilePath); } - }); + return Stream.empty(); + }).iterator(); }); - return records; + allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD); + + return allRecordsRDD; } /** - * Convert rollback action metadata to column stats index records. + * Convert added and deleted action metadata to column stats index records. */ - private static List convertFilesToColumnStatsRecords(HoodieEngineContext engineContext, - HoodieTableMetaClient datasetMetaClient, - Map> partitionToDeletedFiles, - Map> partitionToAppendedFiles, - String instantTime) { - List records = new LinkedList<>(); - List latestColumns = getLatestColumns(datasetMetaClient); - partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> { + public static HoodieData convertFilesToColumnStatsRecords(HoodieEngineContext engineContext, + Map> partitionToDeletedFiles, + Map> partitionToAppendedFiles, + MetadataRecordsGenerationParams recordsGenerationParams) { + HoodieData allRecordsRDD = engineContext.emptyHoodieData(); + final List columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled()); + + final List>> partitionToDeletedFilesList = partitionToDeletedFiles.entrySet() + .stream().map(e -> Pair.of(e.getKey(), e.getValue())).collect(Collectors.toList()); + final HoodieData>> partitionToDeletedFilesRDD = engineContext.parallelize(partitionToDeletedFilesList, + Math.max(partitionToDeletedFilesList.size(), recordsGenerationParams.getColumnStatsIndexParallelism())); + + HoodieData deletedFilesRecordsRDD = partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> { + final String partitionName = partitionToDeletedFilesEntry.getLeft(); final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; - if (deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + final List deletedFileList = partitionToDeletedFilesEntry.getRight(); + + return deletedFileList.stream().flatMap(deletedFile -> { final String filePathWithPartition = partitionName + "/" + deletedFile; - records.addAll(getColumnStats(partition, filePathWithPartition, datasetMetaClient, - latestColumns, true).collect(Collectors.toList())); - } - })); - - partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> appendedFileMap.forEach( - (appendedFile, size) -> { - final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; - if (appendedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - final String filePathWithPartition = partitionName + "/" + appendedFile; - records.addAll(getColumnStats(partition, filePathWithPartition, datasetMetaClient, - latestColumns, false).collect(Collectors.toList())); - } - })); - return records; + return getColumnStats(partition, filePathWithPartition, recordsGenerationParams.getDataMetaClient(), columnsToIndex, true); + }).iterator(); + }); + allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD); + + final List>> partitionToAppendedFilesList = partitionToAppendedFiles.entrySet() + .stream().map(entry -> Pair.of(entry.getKey(), entry.getValue())).collect(Collectors.toList()); + final HoodieData>> partitionToAppendedFilesRDD = engineContext.parallelize(partitionToAppendedFilesList, + Math.max(partitionToAppendedFiles.size(), recordsGenerationParams.getColumnStatsIndexParallelism())); + + HoodieData appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesEntry -> { + final String partitionName = partitionToAppendedFilesEntry.getLeft(); + final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; + final Map appendedFileMap = partitionToAppendedFilesEntry.getRight(); + + return appendedFileMap.entrySet().stream().flatMap(appendedFileNameLengthPair -> { + // TODO: HUDI-3374 Handle log files without delta write stat to get records column stats + if (!FSUtils.isBaseFile(new Path(appendedFileNameLengthPair.getKey())) + || !appendedFileNameLengthPair.getKey().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + return Stream.empty(); + } + final String filePathWithPartition = partitionName + "/" + appendedFileNameLengthPair.getKey(); + return getColumnStats(partition, filePathWithPartition, recordsGenerationParams.getDataMetaClient(), columnsToIndex, false); + }).iterator(); + + }); + allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD); + + return allRecordsRDD; } /** @@ -748,7 +760,7 @@ public static HoodieTableFileSystemView getFileSystemView(HoodieTableMetaClient if (timeline.empty()) { final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.createNewInstantTime()); - timeline = new HoodieDefaultTimeline(Arrays.asList(instant).stream(), metaClient.getActiveTimeline()::getInstantDetails); + timeline = new HoodieDefaultTimeline(Stream.of(instant), metaClient.getActiveTimeline()::getInstantDetails); } return new HoodieTableFileSystemView(metaClient, timeline); } @@ -776,20 +788,16 @@ private static List getPartitionFileSlices(HoodieTableMetaClient meta } else { fileSliceStream = fsView.getLatestFileSlices(partition); } - return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList()); + return fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList()); } - public static List convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata, - HoodieEngineContext engineContext, - HoodieTableMetaClient dataMetaClient, - boolean isMetaIndexColumnStatsForAllColumns, - String instantTime) { - + public static HoodieData convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata, + HoodieEngineContext engineContext, + MetadataRecordsGenerationParams recordsGenerationParams) { try { List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() .flatMap(entry -> entry.stream()).collect(Collectors.toList()); - return HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, dataMetaClient, allWriteStats, - isMetaIndexColumnStatsForAllColumns); + return HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, allWriteStats, recordsGenerationParams); } catch (Exception e) { throw new HoodieException("Failed to generate column stats records for metadata table ", e); } @@ -798,30 +806,20 @@ public static List convertMetadataToColumnStatsRecords(HoodieCommi /** * Create column stats from write status. * - * @param engineContext - Enging context - * @param datasetMetaClient - Dataset meta client - * @param allWriteStats - Write status to convert - * @param isMetaIndexColumnStatsForAllColumns - Are all columns enabled for indexing + * @param engineContext - Engine context + * @param allWriteStats - Write status to convert + * @param recordsGenerationParams - Parameters for columns stats record generation */ - public static List createColumnStatsFromWriteStats(HoodieEngineContext engineContext, - HoodieTableMetaClient datasetMetaClient, - List allWriteStats, - boolean isMetaIndexColumnStatsForAllColumns) throws Exception { + public static HoodieData createColumnStatsFromWriteStats(HoodieEngineContext engineContext, + List allWriteStats, + MetadataRecordsGenerationParams recordsGenerationParams) { if (allWriteStats.isEmpty()) { - return Collections.emptyList(); + return engineContext.emptyHoodieData(); } - - List prunedWriteStats = allWriteStats.stream().filter(writeStat -> { - return !(writeStat instanceof HoodieDeltaWriteStat); - }).collect(Collectors.toList()); - if (prunedWriteStats.isEmpty()) { - return Collections.emptyList(); - } - - return engineContext.flatMap(prunedWriteStats, - writeStat -> translateWriteStatToColumnStats(writeStat, datasetMetaClient, - getLatestColumns(datasetMetaClient, isMetaIndexColumnStatsForAllColumns)), - prunedWriteStats.size()); + HoodieData allWriteStatsRDD = engineContext.parallelize( + allWriteStats, Math.max(allWriteStats.size(), recordsGenerationParams.getColumnStatsIndexParallelism())); + return allWriteStatsRDD.flatMap(writeStat -> translateWriteStatToColumnStats(writeStat, recordsGenerationParams.getDataMetaClient(), + getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled())).iterator()); } /** @@ -830,7 +828,7 @@ public static List createColumnStatsFromWriteStats(HoodieEngineCon * @param datasetMetaClient - Data table meta client * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing enabled for all columns */ - private static List getLatestColumns(HoodieTableMetaClient datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) { + private static List getColumnsToIndex(HoodieTableMetaClient datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) { if (!isMetaIndexColumnStatsForAllColumns || datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() < 1) { return Collections.singletonList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp()); @@ -847,27 +845,26 @@ private static List getLatestColumns(HoodieTableMetaClient datasetMetaCl } } - private static List getLatestColumns(HoodieTableMetaClient datasetMetaClient) { - return getLatestColumns(datasetMetaClient, false); - } - public static Stream translateWriteStatToColumnStats(HoodieWriteStat writeStat, HoodieTableMetaClient datasetMetaClient, - List latestColumns) { - return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, latestColumns, false); - + List columnsToIndex) { + Option>> columnRangeMap = Option.empty(); + if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) writeStat).getRecordsStats().isPresent()) { + List> columnRangeMetadataList = new ArrayList<>(columnRangeMap.get().values()); + return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false); + } + return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex,false); } private static Stream getColumnStats(final String partitionPath, final String filePathWithPartition, HoodieTableMetaClient datasetMetaClient, - List columns, boolean isDeleted) { + List columnsToIndex, + //Option>> columnRangeMap, + boolean isDeleted) { final String partition = partitionPath.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionPath; final int offset = partition.equals(NON_PARTITIONED_NAME) ? (filePathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1; final String fileName = filePathWithPartition.substring(offset); - if (!FSUtils.isBaseFile(new Path(fileName))) { - return Stream.empty(); - } if (filePathWithPartition.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { List> columnRangeMetadataList = new ArrayList<>(); @@ -875,13 +872,13 @@ private static Stream getColumnStats(final String partitionPath, f if (!isDeleted) { try { columnRangeMetadataList = new ParquetUtils().readRangeFromParquetMetadata( - datasetMetaClient.getHadoopConf(), fullFilePath, columns); + datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex); } catch (Exception e) { LOG.error("Failed to read column stats for " + fullFilePath, e); } } else { columnRangeMetadataList = - columns.stream().map(entry -> new HoodieColumnRangeMetadata(fileName, + columnsToIndex.stream().map(entry -> new HoodieColumnRangeMetadata(fileName, entry, null, null, 0, 0, 0, 0)) .collect(Collectors.toList()); } @@ -920,5 +917,4 @@ public static int getPartitionFileGroupCount(final MetadataPartitionType partiti return 1; } } - } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java new file mode 100644 index 0000000000000..f653fb30fe3f5 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.table.HoodieTableMetaClient; + +import java.io.Serializable; +import java.util.List; + +/** + * Encapsulates all parameters required to generate metadata index for enabled index types. + */ +public class MetadataRecordsGenerationParams implements Serializable { + + private final HoodieTableMetaClient dataMetaClient; + private final List enabledPartitionTypes; + private final String bloomFilterType; + private final int bloomIndexParallelism; + private final boolean isAllColumnStatsIndexEnabled; + private final int columnStatsIndexParallelism; + + MetadataRecordsGenerationParams(HoodieTableMetaClient dataMetaClient, List enabledPartitionTypes, String bloomFilterType, int bloomIndexParallelism, + boolean isAllColumnStatsIndexEnabled, int columnStatsIndexParallelism) { + this.dataMetaClient = dataMetaClient; + this.enabledPartitionTypes = enabledPartitionTypes; + this.bloomFilterType = bloomFilterType; + this.bloomIndexParallelism = Math.max(bloomIndexParallelism, 1); + this.isAllColumnStatsIndexEnabled = isAllColumnStatsIndexEnabled; + this.columnStatsIndexParallelism = columnStatsIndexParallelism; + } + + public HoodieTableMetaClient getDataMetaClient() { + return dataMetaClient; + } + + public List getEnabledPartitionTypes() { + return enabledPartitionTypes; + } + + public String getBloomFilterType() { + return bloomFilterType; + } + + public boolean isAllColumnStatsIndexEnabled() { + return isAllColumnStatsIndexEnabled; + } + + public int getBloomIndexParallelism() { + return bloomIndexParallelism; + } + + public int getColumnStatsIndexParallelism() { + return columnStatsIndexParallelism; + } +}