diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index 91524f1751afe..4f03eb97309e1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -193,6 +193,12 @@ public class HoodieIndexConfig extends HoodieConfig { .key("hoodie.simple.index.update.partition.path") .defaultValue("false") .withDocumentation("Similar to " + BLOOM_INDEX_UPDATE_PARTITION_PATH + ", but for simple index."); + + + public static final ConfigProperty RANGE_INDEX_ENABLED = ConfigProperty + .key("hoodie.range.index.enabled") + .defaultValue(false) + .withDocumentation("Enable storing range index"); private EngineType engineType; @@ -245,6 +251,11 @@ public Builder withHBaseIndexConfig(HoodieHBaseIndexConfig hBaseIndexConfig) { return this; } + public Builder withRangeIndexEnabled(boolean enableRangeIndex) { + hoodieIndexConfig.setValue(RANGE_INDEX_ENABLED, String.valueOf(enableRangeIndex)); + return this; + } + public Builder bloomFilterNumEntries(int numEntries) { hoodieIndexConfig.setValue(BLOOM_FILTER_NUM_ENTRIES, String.valueOf(numEntries)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 6e87257b3f428..ebfb74762c31e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -373,6 +373,11 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Whether to allow generation of empty commits, even if no data was written in the commit. " + "It's useful in cases where extra metadata needs to be published regardless e.g tracking source offsets when ingesting data"); + public static final ConfigProperty TABLE_REQUIRES_SORTING = ConfigProperty + .key("hoodie.table.sort.key") + .defaultValue(false) + .withDocumentation("optional configuration to require sorting based on key"); + private ConsistencyGuardConfig consistencyGuardConfig; // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled @@ -933,6 +938,13 @@ public boolean getGlobalSimpleIndexUpdatePartitionPath() { return getBoolean(HoodieIndexConfig.SIMPLE_INDEX_UPDATE_PARTITION_PATH); } + /** + * Secondary index config. + */ + public Boolean getIsRangeIndexEnabled() { + return getBoolean(HoodieIndexConfig.RANGE_INDEX_ENABLED); + } + /** * storage properties. */ @@ -1304,6 +1316,10 @@ public String getPreCommitValidatorInequalitySqlQueries() { public boolean allowEmptyCommit() { return getBooleanOrDefault(ALLOW_EMPTY_COMMIT); } + + public boolean requiresSorting() { + return getBooleanOrDefault(TABLE_REQUIRES_SORTING); + } public static class Builder { @@ -1611,6 +1627,11 @@ public Builder withPopulateMetaFields(boolean populateMetaFields) { return this; } + public Builder withRequiresSorting(boolean tableNeedsSorting) { + writeConfig.setValue(HoodieWriteConfig.TABLE_REQUIRES_SORTING, Boolean.toString(tableNeedsSorting)); + return this; + } + public Builder withProperties(Properties properties) { this.writeConfig.getProps().putAll(properties); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieSecondaryIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieSecondaryIndex.java new file mode 100644 index 0000000000000..678c226092656 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieSecondaryIndex.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index; + +import org.apache.hudi.ApiMaturityLevel; +import org.apache.hudi.PublicAPIClass; +import org.apache.hudi.PublicAPIMethod; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.io.Serializable; + +/** + * Base class for different types of secondary indexes. + * This is different from primary index because + * a) index lookup operation is very different (using predicates instead of sending all keys). + * b) There can be multiple secondary index configured for same table. (one column may use range index/some other column may use bloom index) + * + * @param Sub type of HoodieRecordPayload + * @param Type of inputs + * @param Type of keys + * @param Type of outputs + */ +@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING) +public abstract class HoodieSecondaryIndex implements Serializable { + + protected final HoodieWriteConfig config; + + protected final HoodieEngineContext engineContext; + + protected HoodieSecondaryIndex(HoodieWriteConfig config, HoodieEngineContext engineContext) { + this.config = config; + this.engineContext = engineContext; + } + + /** + * Update index to quickly identify records. + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public abstract void updateIndex(HoodieWriteMetadata writeMetadata, String instantTime, + HoodieTable hoodieTable) throws HoodieIndexException; + + /** + * Rollback the effects of the commit made at instantTime. + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public abstract boolean rollbackCommit(String instantTime); + + /** + * TODO figure out the signature for this + public abstract List findMatchingDataFiles(List predicates) + */ + /** + * Each index type should implement it's own logic to release any resources acquired during the process. + */ + public void close() { + } + + public enum SecondaryIndexType { + /* TODO: also support multiple secondary index on same table - different columns can have different values */ + RANGE, + BLOOM, /* TODO: we can leverage same bloom index used for primary key */ + CUCKOO, /* TODO: implementation, For medium cardinality, cuckoo index likely performs better than bloom index https://www.vldb.org/pvldb/vol13/p3559-kipf.pdf */ + POINT /* TODO: Finding exact value quickly. maybe useful for primary key lookups too. */ + } + + public HoodieWriteConfig getWriteConfig() { + return this.config; + } + + public HoodieEngineContext getEngineContext() { + return this.engineContext; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 9cd5301fb5c8f..f4eba5eb84936 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -18,6 +18,10 @@ package org.apache.hudi.metadata; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieMetadataRecord; @@ -29,6 +33,7 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; @@ -51,16 +56,13 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -98,10 +100,17 @@ protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteC this.hadoopConf = new SerializableConfiguration(hadoopConf); if (writeConfig.useFileListingMetadata()) { + enabled = true; + } else if (writeConfig.getIsRangeIndexEnabled()) { + enabled = true; + } else { + enabled = false; + this.metrics = Option.empty(); + } + + if (enabled) { this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX; this.metadataWriteConfig = createMetadataWriteConfig(writeConfig); - enabled = true; - // Inline compaction and auto clean is required as we dont expose this table outside ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(), "Cleaning is controlled internally for Metadata table."); ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(), "Compaction is controlled internally for metadata table."); @@ -112,18 +121,13 @@ protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteC initRegistry(); HoodieTableMetaClient datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build(); initialize(engineContext, datasetMetaClient); - if (enabled) { - // This is always called even in case the table was created for the first time. This is because - // initFromFilesystem() does file listing and hence may take a long time during which some new updates - // may have occurred on the table. Hence, calling this always ensures that the metadata is brought in sync - // with the active timeline. - HoodieTimer timer = new HoodieTimer().startTimer(); - syncFromInstants(datasetMetaClient); - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.SYNC_STR, timer.endTimer())); - } - } else { - enabled = false; - this.metrics = Option.empty(); + // This is always called even in case the table was created for the first time. This is because + // initFromFilesystem() does file listing and hence may take a long time during which some new updates + // may have occurred on the table. Hence, calling this always ensures that the metadata is brought in sync + // with the active timeline. + HoodieTimer timer = new HoodieTimer().startTimer(); + syncFromInstants(datasetMetaClient); + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.SYNC_STR, timer.endTimer())); } } @@ -169,6 +173,7 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi .withParallelism(parallelism, parallelism) .withDeleteParallelism(parallelism) .withRollbackParallelism(parallelism) + .withRequiresSorting(true) .withFinalizeWriteParallelism(parallelism); if (writeConfig.isMetricsOn()) { @@ -283,7 +288,8 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi // We can only bootstrap if there are no pending operations on the dataset Option pendingInstantOption = Option.fromJavaOptional(datasetMetaClient.getActiveTimeline() .getReverseOrderedInstants().filter(i -> !i.isCompleted()).findFirst()); - if (pendingInstantOption.isPresent()) { + // This filterCompletedInstants.firstInstant.isPresent check is a hack to make 'quick start' work when metadata is enabled. + if (datasetMetaClient.getCommitsTimeline().filterCompletedInstants().firstInstant().isPresent() && pendingInstantOption.isPresent()) { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1)); LOG.warn("Cannot bootstrap metadata table as operation is in progress: " + pendingInstantOption.get()); return false; @@ -291,7 +297,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit // Otherwise, we use the latest commit timestamp. - String createInstantTime = datasetMetaClient.getActiveTimeline().getReverseOrderedInstants().findFirst() + String createInstantTime = datasetMetaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().findFirst() .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime); @@ -336,8 +342,18 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi } }); + Map>> rangeStats; + if (datasetWriteConfig.getIsRangeIndexEnabled()) { + List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(entry -> entry.stream()).collect(Collectors.toList()); + //TODO this result may be too large to fit into memory. We may need to convert to dataframe and move this to spark-client + rangeStats = HoodieTableMetadataUtil.createRangeIndexInfoFromWriteStats(engineContext, datasetMetaClient, allWriteStats); + } else { + rangeStats = Collections.emptyMap(); + } + LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata"); - update(commitMetadata, createInstantTime); + update(commitMetadata, rangeStats, createInstantTime); return true; } @@ -379,6 +395,8 @@ private Map> getPartitionsToFilesMapping(HoodieTableMet if (p.getRight().length > filesInDir.size()) { String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetMetaClient.getBasePath()), p.getLeft()); + LOG.info("Processing partition " + partitionName + " total files: " + filesInDir.size()); + // deal with Non-partition table, we should exclude .hoodie partitionToFileStatus.put(partitionName, filesInDir.stream() .filter(f -> !f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList())); @@ -416,10 +434,15 @@ private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) { for (HoodieInstant instant : instantsToSync) { LOG.info("Syncing instant " + instant + " to metadata table"); - Option> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, getLatestSyncedInstantTime()); - if (records.isPresent()) { - commit(records.get(), MetadataPartitionType.FILES.partitionPath(), instant.getTimestamp()); + //TODO: Can we combine below two lines and generate proper RDD/Dataframe here here? we may have to move this to hudi-spark-client + Option> fileRecords = HoodieTableMetadataUtil.convertInstantToFilesMetaRecords(datasetMetaClient, instant, getLatestSyncedInstantTime()); + List rangeIndexRecords; + if (datasetWriteConfig.getIsRangeIndexEnabled()) { + rangeIndexRecords = HoodieTableMetadataUtil.convertInstantToRangeMetaRecords(datasetMetaClient, instant, getLatestSyncedInstantTime()); + } else { + rangeIndexRecords = Collections.emptyList(); } + commit(fileRecords.orElse(Collections.emptyList()), rangeIndexRecords, instant.getTimestamp()); } initTableMetadata(); } catch (IOException ioe) { @@ -497,6 +520,29 @@ public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) } } + /** + * Update column range info for new files created by instant. + */ + @Override + public void update(Map>> fileToColumnRangeInfo, String instantTime) { + if (enabled) { + List rangeIndexRecords = HoodieTableMetadataUtil.convertMetadataToRecords(fileToColumnRangeInfo, instantTime, metadata.getSyncedInstantTime()); + commit(Collections.emptyList(), rangeIndexRecords, instantTime); + } + } + + public void update(HoodieCommitMetadata commitMetadata, Map>> fileToColumnRangeInfo, String instantTime) { + List fileListRecords = HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime); + List rangeIndexRecords; + if (!fileToColumnRangeInfo.isEmpty()) { + rangeIndexRecords = HoodieTableMetadataUtil.convertMetadataToRecords(fileToColumnRangeInfo, instantTime, metadata.getSyncedInstantTime()); + } else { + rangeIndexRecords = Collections.emptyList(); + } + + commit(fileListRecords, rangeIndexRecords, instantTime); + } + @Override public void close() throws Exception { if (metadata != null) { @@ -509,4 +555,7 @@ public void close() throws Exception { * */ protected abstract void commit(List records, String partitionName, String instantTime); + + protected abstract void commit(List fileListRecords, List rangeIndexRecords, String instantTime); + } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index 1b02a3b92b353..3655ead3e61d0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -22,10 +22,13 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.util.Option; import java.io.Serializable; +import java.util.Collection; +import java.util.Map; /** * Interface that supports updating metadata for a given table, as actions complete. @@ -42,6 +45,8 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { void update(HoodieRollbackMetadata rollbackMetadata, String instantTime); + void update(Map>> fileToColumnRangeInfo, String instantTime); + /** * Return the timestamp of the latest instant synced to the metadata table. */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 19cc010bfb148..6c05ce00a762d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -671,7 +671,7 @@ public String getBaseFileExtension() { } public boolean requireSortedRecords() { - return getBaseFileFormat() == HoodieFileFormat.HFILE; + return getBaseFileFormat() == HoodieFileFormat.HFILE || config.requiresSorting(); } public HoodieEngineContext getContext() { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 298113054dd38..5633599c05bf1 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -90,12 +90,17 @@ protected void initialize(HoodieEngineContext engineContext, HoodieTableMetaClie protected void commit(List records, String partitionName, String instantTime) { ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); List recordRDD = prepRecords(records, partitionName); + commit(recordRDD, Collections.emptyList(), instantTime); + } + @Override + protected void commit(List recordRDD, List rangeRecords, String instantTime) { try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig, true)) { writeClient.startCommitWithTime(instantTime); writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime); List statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime); + statuses.forEach(writeStatus -> { if (writeStatus.hasErrors()) { throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/dfs/SparkFileToRangeIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/dfs/SparkFileToRangeIndex.java new file mode 100644 index 0000000000000..0af045ff46263 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/dfs/SparkFileToRangeIndex.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.dfs; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.index.HoodieSecondaryIndex; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.spark.api.java.JavaRDD; + +import java.util.Collection; +import java.util.Map; + +/** + * Hoodie Index implementation backed by HBase. + */ +public class SparkFileToRangeIndex extends HoodieSecondaryIndex>, JavaRDD, JavaRDD> { + + public SparkFileToRangeIndex(HoodieWriteConfig config, HoodieEngineContext engineContext) { + super(config, engineContext); + } + + @Override + public void updateIndex(final HoodieWriteMetadata> writeMetadata, + final String instantTime, + final HoodieTable>, JavaRDD, JavaRDD> hoodieTable) throws HoodieIndexException { + Map>> fileToColumnRangeInfo = HoodieTableMetadataUtil.createRangeIndexInfoFromWriteStats( + engineContext, + hoodieTable.getMetaClient(), + writeMetadata.getWriteStatuses().map(WriteStatus::getStat).collect()); + SparkHoodieBackedTableMetadataWriter.create(hoodieTable.getHadoopConf(), getWriteConfig(), getEngineContext()).update(fileToColumnRangeInfo, instantTime); + } + + @Override + public boolean rollbackCommit(final String instantTime) { + throw new HoodieException("rollback not supported yet on range index"); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index c014e8be57976..493831efd439c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -18,10 +18,12 @@ package org.apache.hudi.metadata; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; @@ -40,8 +42,6 @@ import org.apache.hudi.metrics.DistributedRegistry; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; - -import org.apache.hadoop.conf.Configuration; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -101,8 +101,20 @@ protected void initialize(HoodieEngineContext engineContext, HoodieTableMetaClie @Override protected void commit(List records, String partitionName, String instantTime) { ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); - JavaRDD recordRDD = prepRecords(records, partitionName); + JavaRDD recordRDD = prepFileListingRecords(records); + commit(recordRDD, instantTime); + } + @Override + protected void commit(List fileListRecords, List rangeIndexRecords, String instantTime) { + JavaSparkContext jsc = ((HoodieSparkEngineContext) engineContext).getJavaSparkContext(); + JavaRDD fileRecordsRDD = prepFileListingRecords(fileListRecords); + JavaRDD rangeRecordsRDD = prepRangeIndexRecords(rangeIndexRecords, instantTime, "column"); + + commit(jsc.union(fileRecordsRDD, rangeRecordsRDD), instantTime); + } + + private void commit(JavaRDD recordRDD, String instantTime) { try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) { writeClient.startCommitWithTime(instantTime); List statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime).collect(); @@ -157,20 +169,18 @@ public Option getLatestSyncedInstantTime() { * Since we only read the latest base file in a partition, we tag the records with the instant time of the latest * base file. */ - private JavaRDD prepRecords(List records, String partitionName) { + private JavaRDD prepFileListingRecords(List records) { HoodieTable table = HoodieSparkTable.create(metadataWriteConfig, engineContext); TableFileSystemView.SliceView fsView = table.getSliceView(); - List baseFiles = fsView.getLatestFileSlices(partitionName) + List baseFiles = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()) .map(FileSlice::getBaseFile) .filter(Option::isPresent) .map(Option::get) .collect(Collectors.toList()); // All the metadata fits within a single base file - if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) { - if (baseFiles.size() > 1) { - throw new HoodieMetadataException("Multiple base files found in metadata partition"); - } + if (baseFiles.size() > 1) { + throw new HoodieMetadataException("Multiple base files found in metadata partition"); } JavaSparkContext jsc = ((HoodieSparkEngineContext) engineContext).getJavaSparkContext(); @@ -197,4 +207,27 @@ private JavaRDD prepRecords(List records, String par return jsc.parallelize(records, 1).map(r -> r.setCurrentLocation(new HoodieRecordLocation(instantTime, fileId))); } + + /** + * Tag each record with the location. + * we try to keep all relevant records in a single file in metadata table. The 'relevance' can be defined by one of the columns in 'HoodieKey'. + * + * This is basically poor man's version of 'bucketing'. After we have bucketing in hudi, we can use that feature and replace this with bucketing. + * + * For example, lets say main table schema has 10 columns. We want to store all meatadata of column1 in same file f1 on metadata table. + * Similarly, metadata for column2 across all partitions in main table in f2 on metadata table and so on. + * + * In that scenario, we can use 'column' as keyAttribute. + * + * If we know a query is interested only in column1, column2, then on metadata table, we only need to read files f1 and f2. + */ + private JavaRDD prepRangeIndexRecords(List records, String instantTime, String keyAttribute) { + JavaSparkContext jsc = ((HoodieSparkEngineContext) engineContext).getJavaSparkContext(); + return jsc.parallelize(records).map(r -> { + // we want to create deterministic fileId for given key. So generate fileId based on key (key is typically either column_name or partitionPath of main dataset) + String fileIdPfx = FSUtils.createFileIdPfxFromKey(HoodieMetadataPayload.getAttributeFromRecordKey(r.getRecordKey(), keyAttribute)); + r.setCurrentLocation(new HoodieRecordLocation(instantTime, fileIdPfx + "-" + 0)); + return r; + }); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 2bc1f0302798e..bcf46660cd17b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -43,6 +43,8 @@ import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.SparkLazyInsertIterable; +import org.apache.hudi.index.HoodieSecondaryIndex; +import org.apache.hudi.index.dfs.SparkFileToRangeIndex; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieSortedMergeHandle; @@ -237,6 +239,10 @@ protected JavaRDD updateIndex(JavaRDD writeStatusRDD, JavaRDD statuses = table.getIndex().updateLocation(writeStatusRDD, context, table); result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now())); result.setWriteStatuses(statuses); + if (config.getIsRangeIndexEnabled()) { + HoodieSecondaryIndex secondaryIndex = new SparkFileToRangeIndex<>(config, context); + secondaryIndex.updateIndex(result, instantTime, table); + } return statuses; } diff --git a/hudi-common/src/main/avro/HoodieMetadata.avsc b/hudi-common/src/main/avro/HoodieMetadata.avsc index bf85587a3a7ac..1302fd972c655 100644 --- a/hudi-common/src/main/avro/HoodieMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieMetadata.avsc @@ -51,6 +51,41 @@ ] } }] - } + }, + { "name": "rangeIndexMetadata", + "doc": "Contains information about column ranges for all data files in the table", + "type": ["null", { + "type": "record", + "name": "HoodieRangeIndexInfo", + "fields": [ + { + "name": "columnName", + "type": "string", + "doc": "Column name. Represented as dot string for nested columns" + }, + { + "name": "filePath", + "type": "string", + "doc": "relative file path from basePath. Includes partition name and full fileName" + }, + { + "name": "rangeLow", + "type": ["null","string"], + "doc": "Low end of the range. For now, this is a String. Based on main data table schema, we can convert it to appropriate type" + }, + { + "name": "rangeHigh", + "type": ["null","string"], + "doc": "High end of the range. For now, this is a String. Based on main data table schema, we can convert it to appropriate type" + }, + { + "name": "isDeleted", + "type": "boolean", + "doc": "True if this file has been deleted" + } + ] + } + ] + } ] } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index cb778e6826506..ce7eb5360cf18 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -48,6 +48,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -309,6 +310,10 @@ private static PathFilter getExcludeMetaPathFilter() { public static String createNewFileIdPfx() { return UUID.randomUUID().toString(); } + + public static String createFileIdPfxFromKey(String key) { + return UUID.nameUUIDFromBytes(key.getBytes(StandardCharsets.UTF_8)).toString(); + } /** * Get the file extension from the log file. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java new file mode 100644 index 0000000000000..64381a9f01463 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.model; + +import java.util.Objects; + +/** + * Hoodie Range metadata. + */ +public class HoodieColumnRangeMetadata { + + private final String filePath; + private final String columnName; + private final T minValue; + private final T maxValue; + + public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue) { + this.filePath = filePath; + this.columnName = columnName; + this.minValue = minValue; + this.maxValue = maxValue; + } + + public String getFilePath() { + return this.filePath; + } + + public String getColumnName() { + return this.columnName; + } + + public T getMinValue() { + return this.minValue; + } + + public T getMaxValue() { + return this.maxValue; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final HoodieColumnRangeMetadata that = (HoodieColumnRangeMetadata) o; + return Objects.equals(getFilePath(), that.getFilePath()) + && Objects.equals(getColumnName(), that.getColumnName()) + && Objects.equals(getMinValue(), that.getMinValue()) + && Objects.equals(getMaxValue(), that.getMaxValue()); + } + + @Override + public int hashCode() { + return Objects.hash(getColumnName(), getMinValue(), getMaxValue()); + } + + @Override + public String toString() { + return "HoodieColumnRangeMetadata{" + + "filePath ='" + filePath + '\'' + + "columnName='" + columnName + '\'' + + ", minValue=" + minValue + + ", maxValue=" + maxValue + '}'; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index ebe361025991c..5020a73070409 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.exception.HoodieIOException; @@ -41,12 +42,14 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; /** * Utility functions involving with parquet. @@ -179,6 +182,61 @@ public ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) { } return footer; } + + /** + * Parse min/max statistics stored in parquet footers for all columns. + */ + public Collection> readRangeFromParquetMetadata(Configuration conf, String partitionPath, Path parquetFilePath) { + + ParquetMetadata metadata = readMetadata(conf, parquetFilePath); + // collect stats from all parquet blocks + Map>> columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> { + return blockMetaData.getColumns().stream().map(columnChunkMetaData -> + new HoodieColumnRangeMetadata<>(partitionPath + "/" + parquetFilePath.getName(), columnChunkMetaData.getPath().toDotString(), + columnChunkMetaData.getStatistics().genericGetMin(), + columnChunkMetaData.getStatistics().genericGetMax()) + ); + }).collect(Collectors.groupingBy(e -> e.getColumnName())); + + // we only intend to keep file level statistics (not per-block level statisitcs in index. In future, we can change to store row group level stats improve performance even more) + // So reduce above map to Map + return new ArrayList<>(columnToStatsListMap.values().stream() + .map(blocks -> getColumnRangeInFile(blocks)) + .collect(Collectors.toList())); + } + + private HoodieColumnRangeMetadata getColumnRangeInFile(final List> blockRanges) { + if (blockRanges.size() == 1) { + // only one block in parquet file. we can just return that range. + return blockRanges.get(0); + } else { + // there are multiple blocks. Compute min(block_mins) and max(block_maxs) + return blockRanges.stream().reduce((b1, b2) -> combineRanges(b1, b2)).get(); + } + } + + private HoodieColumnRangeMetadata combineRanges(HoodieColumnRangeMetadata range1, + HoodieColumnRangeMetadata range2) { + final Comparable minValue; + final Comparable maxValue; + if (range1.getMinValue() != null && range2.getMinValue() != null) { + minValue = range1.getMinValue().compareTo(range2.getMinValue()) < 0 ? range1.getMinValue() : range2.getMinValue(); + } else if (range1.getMinValue() == null) { + minValue = range2.getMinValue(); + } else { + minValue = range1.getMinValue(); + } + + if (range1.getMaxValue() != null && range2.getMaxValue() != null) { + maxValue = range1.getMaxValue().compareTo(range2.getMaxValue()) < 0 ? range2.getMaxValue() : range1.getMaxValue(); + } else if (range1.getMaxValue() == null) { + maxValue = range2.getMaxValue(); + } else { + maxValue = range1.getMaxValue(); + } + + return new HoodieColumnRangeMetadata<>(range1.getFilePath(), range1.getColumnName(), minValue, maxValue); + } /** * Get the schema of the given parquet file. diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java index b954e57e77c7e..9d424bad8958c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -234,6 +236,31 @@ public Option getRecordByKey(String key, Schema readerSchema) throws IOException return Option.empty(); } + + public Map getRecordsInRange(String startKey, String endKey) throws IOException { + HFileScanner scanner = reader.getScanner(false, true); + KeyValue kv = new KeyValue(startKey.getBytes(), null, null, null); + if (scanner.seekBefore(kv)) { + scanner.next(); + } else if (!scanner.seekTo()) { + return Collections.emptyMap(); + } + + Map keyToValueMap = new HashMap<>(); + while (true) { + R record = getRecordFromCell(scanner.getKeyValue(), getSchema(), getSchema()); + String recordKey = record.get(0).toString(); + if (recordKey.compareTo(endKey) > 0) { + return keyToValueMap; + } + keyToValueMap.put(recordKey, record); + if (!scanner.next()) { + break; + } + } + + return keyToValueMap; + } private R getRecordFromCell(Cell c, Schema writerSchema, Schema readerSchema) throws IOException { byte[] value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength()); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 9c6eb89b909f8..fd1b0c276e007 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -20,6 +20,8 @@ import org.apache.hudi.avro.model.HoodieMetadataFileInfo; import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.avro.model.HoodieRangeIndexInfo; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -36,6 +38,8 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -65,10 +69,12 @@ public class HoodieMetadataPayload implements HoodieRecordPayload filesystemMetadata = null; + private HoodieRangeIndexInfo rangeIndexMetadata = null; public HoodieMetadataPayload(Option record) { if (record.isPresent()) { @@ -83,6 +89,12 @@ public HoodieMetadataPayload(Option record) { filesystemMetadata.put(k.toString(), new HoodieMetadataFileInfo((Long)v.get("size"), (Boolean)v.get("isDeleted"))); }); } + + if (record.get().get("rangeIndexMetadata") != null) { + GenericRecord v = (GenericRecord) record.get().get("rangeIndexMetadata"); + rangeIndexMetadata = new HoodieRangeIndexInfo(String.valueOf(v.get("columnName")), String.valueOf(v.get("filePath")), + String.valueOf(v.get("rangeLow")), String.valueOf(v.get("rangeHigh")), (Boolean) v.get("isDeleted")); + } } } @@ -92,6 +104,16 @@ private HoodieMetadataPayload(String key, int type, Map filesystemMetadata, + HoodieRangeIndexInfo rangeIndexInfo) { + this.key = key; + this.type = type; + this.filesystemMetadata = filesystemMetadata; + this.rangeIndexMetadata = rangeIndexInfo; + } + + /** * Create and return a {@code HoodieMetadataPayload} to save list of partitions. * @@ -132,17 +154,21 @@ public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) { "Cannot combine " + previousRecord.type + " with " + type); Map combinedFileInfo = null; + HoodieRangeIndexInfo combinedRangeInfo = null; switch (type) { case PARTITION_LIST: case FILE_LIST: combinedFileInfo = combineFilesystemMetadata(previousRecord); break; + case RANGE_INDEX: + combinedRangeInfo = combineRangeMetadata(previousRecord); + break; default: throw new HoodieMetadataException("Unknown type of HoodieMetadataPayload: " + type); } - return new HoodieMetadataPayload(key, type, combinedFileInfo); + return new HoodieMetadataPayload(key, type, combinedFileInfo, combinedRangeInfo); } @Override @@ -158,7 +184,7 @@ public Option getInsertValue(Schema schema) throws IOException { return Option.empty(); } - HoodieMetadataRecord record = new HoodieMetadataRecord(key, type, filesystemMetadata); + HoodieMetadataRecord record = new HoodieMetadataRecord(key, type, filesystemMetadata, rangeIndexMetadata); return Option.of(record); } @@ -224,6 +250,47 @@ private Map combineFilesystemMetadata(HoodieMeta return combinedFileInfo; } + private HoodieRangeIndexInfo combineRangeMetadata(HoodieMetadataPayload previousRecord) { + // files are immutable and range is not expected to change. TODO Figure out deletions + return previousRecord.rangeIndexMetadata; + } + + /** + * Create and return a {@code HoodieMetadataPayload} to save list of ranges for given set of files. + */ + public static Stream> createRangeRecords(String filePath, Collection> columnRangeInfo) { + return columnRangeInfo.stream().map(columnRange -> { + HoodieKey key = new HoodieKey(getRangeRecordKey(columnRange), MetadataPartitionType.RANGE_INDEX.partitionPath()); + + HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), RANGE_INDEX, Collections.emptyMap(), + HoodieRangeIndexInfo.newBuilder() + //TODO: we are storing range for all columns as string. add support for other primitive types + // also if min/max is null, we store null for these columns. Should we consider storing String "null" instead? + .setRangeHigh(columnRange.getMinValue() == null ? null : columnRange.getMaxValue().toString()) + .setRangeLow(columnRange.getMinValue() == null ? null : columnRange.getMaxValue().toString()) + .setColumnName(columnRange.getColumnName()) + .setFilePath(filePath) + .setIsDeleted(false) + .build()); + + return new HoodieRecord<>(key, payload); + }); + } + + // get record key from range metadata + public static String getRangeRecordKey(HoodieColumnRangeMetadata columnRange) { + return "column||" + columnRange.getColumnName() + ";;path||" + columnRange.getFilePath(); + } + + // parse attribute in record key. TODO: find better way to get this attribute instaed of parsing key + public static String getAttributeFromRecordKey(String recordKey, String attribute) { + String[] attributeNameValuePairs = recordKey.split(";;"); + return Arrays.stream(attributeNameValuePairs) + .filter(nameValue -> nameValue.startsWith(attribute)) + .findFirst() + .map(s -> s.split("\\|\\|")[1]).orElse(null); + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("HoodieMetadataPayload {"); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 594231225a6d7..52f066964f064 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -22,16 +22,22 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.exception.HoodieException; import org.apache.hadoop.fs.Path; @@ -40,6 +46,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -47,6 +54,7 @@ import java.util.Map; import java.util.function.BiFunction; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; @@ -65,7 +73,7 @@ public class HoodieTableMetadataUtil { * @return a list of metadata table records * @throws IOException */ - public static Option> convertInstantToMetaRecords(HoodieTableMetaClient datasetMetaClient, HoodieInstant instant, Option lastSyncTs) throws IOException { + public static Option> convertInstantToFilesMetaRecords(HoodieTableMetaClient datasetMetaClient, HoodieInstant instant, Option lastSyncTs) throws IOException { HoodieTimeline timeline = datasetMetaClient.getActiveTimeline(); Option> records = Option.empty(); ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced."); @@ -108,6 +116,50 @@ public static Option> convertInstantToMetaRecords(HoodieTable return records; } + /** + * Converts a timeline instant to metadata table records. + * + * @param datasetMetaClient The meta client associated with the timeline instant + * @param instant to fetch and convert to metadata table records + * @return a list of metadata table records + * @throws IOException + */ + public static List convertInstantToRangeMetaRecords(HoodieTableMetaClient datasetMetaClient, HoodieInstant instant, Option lastSyncTs) throws IOException { + HoodieTimeline timeline = datasetMetaClient.getActiveTimeline(); + List records = Collections.emptyList(); + ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced."); + + switch (instant.getAction()) { + case HoodieTimeline.CLEAN_ACTION: + //TODO + throw new HoodieException("clean not supported for range index yet"); + case HoodieTimeline.DELTA_COMMIT_ACTION: + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.COMPACTION_ACTION: + case HoodieTimeline.REPLACE_COMMIT_ACTION: + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + Map>> fileToColumnRangeInfo = + createRangeIndexInfoFromWriteStats(datasetMetaClient, commitMetadata.getPartitionToWriteStats().values().stream().flatMap(e -> e.stream())); + records = convertMetadataToRecords(fileToColumnRangeInfo, instant.getTimestamp(), lastSyncTs); + break; + case HoodieTimeline.ROLLBACK_ACTION: + //TODO + throw new HoodieException("rollback not supported for range index yet"); + case HoodieTimeline.RESTORE_ACTION: + //TODO + throw new HoodieException("restore not supported for range index yet"); + case HoodieTimeline.SAVEPOINT_ACTION: + // Nothing to be done here + break; + default: + throw new HoodieException("Unknown type of action " + instant.getAction()); + } + + return records; + } + + /** * Finds all new files/partitions created as part of commit and creates metadata table records for them. * @@ -340,4 +392,52 @@ private static List convertFilesToRecords(Map return records; } + + public static List convertMetadataToRecords(Map>> fileToColumnRangeInfo, + String instantTime, + Option lastSyncTs) { + + List rangeRecords = fileToColumnRangeInfo.entrySet().stream().flatMap(fileRangeEntry -> + HoodieMetadataPayload.createRangeRecords(fileRangeEntry.getKey(), fileRangeEntry.getValue()) + ).collect(Collectors.toList()); + + LOG.info("Creating " + rangeRecords.size() + " records for column ranges from " + fileToColumnRangeInfo.keySet().size() + + " files created as part of instant " + instantTime + ". Last metadata sync: " + lastSyncTs); + return rangeRecords; + } + + public static Map>> createRangeIndexInfoFromWriteStats(HoodieEngineContext engineContext, + HoodieTableMetaClient datasetMetaClient, + List allWriteStats) { + if (allWriteStats.isEmpty()) { + return Collections.emptyMap(); + } + + return engineContext.mapToPair(allWriteStats, writeStat -> translateStatToRangeIndex(writeStat, datasetMetaClient), allWriteStats.size()); + } + + public static ImmutablePair>> translateStatToRangeIndex(HoodieWriteStat writeStat, + HoodieTableMetaClient datasetMetaClient) { + Collection> rangeMetadataCollection = + HoodieTableMetadataUtil.getRangeStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient); + return ImmutablePair.of(writeStat.getPath(), rangeMetadataCollection); + } + + public static Map>> createRangeIndexInfoFromWriteStats( + HoodieTableMetaClient datasetMetaClient, Stream writeStats) { + + return writeStats.flatMap(writeStat -> getRangeStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient).stream()) + .collect(Collectors.groupingBy(range -> range.getFilePath(), + Collectors.mapping(range -> range, Collectors.toCollection(ArrayList::new)))); + } + + public static Collection> getRangeStats(final String partitionpath, final String path, HoodieTableMetaClient datasetMetaClient) { + if (path.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + //TODO: we are capturing range for most columns in the schema. Should we have allowList/denyList to reduce storage size of index? + return new ParquetUtils().readRangeFromParquetMetadata(datasetMetaClient.getHadoopConf(), partitionpath, new Path(datasetMetaClient.getBasePath(), path)); + } else { + throw new HoodieException("range index not supported for path " + path); + } + } + } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java index 0436de707d2fd..c57c9c56b1641 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java @@ -19,7 +19,8 @@ package org.apache.hudi.metadata; public enum MetadataPartitionType { - FILES("files"); + FILES("files"), + RANGE_INDEX("rangeIndex"); private final String partitionPath; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java index 9ba3f26079d14..8999b984927b2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java @@ -73,7 +73,7 @@ public TimelineMergedTableMetadata(HoodieTableMetaClient metaClient, List> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(metaClient, instant, lastSyncTs); + Option> records = HoodieTableMetadataUtil.convertInstantToFilesMetaRecords(metaClient, instant, lastSyncTs); if (records.isPresent()) { records.get().forEach(record -> processNextRecord(record)); }