diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java index 146a98d6669a9..158b001713067 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java @@ -22,9 +22,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.utils.SparkUtil; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.config.HoodieMetadataConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadata; diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 293c3c00f83bb..484880cc9f51b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.bootstrap.BootstrapMode; import org.apache.hudi.common.config.DefaultHoodieConfig; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; diff --git a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 1454d34847d1f..45e4806964b87 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; @@ -52,7 +53,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieMetadataConfig; import org.apache.hudi.config.HoodieMetricsConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 0696ad0898593..ed6d35acaa164 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -49,6 +49,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.HoodieMetadataFileSystemView; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; @@ -62,7 +63,6 @@ import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.metadata.HoodieMetadataFileSystemView; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; @@ -271,7 +271,8 @@ public SyncableFileSystemView getHoodieView() { private SyncableFileSystemView getFileSystemViewInternal(HoodieTimeline timeline) { if (config.useFileListingMetadata()) { FileSystemViewStorageConfig viewConfig = config.getViewStorageConfig(); - return new HoodieMetadataFileSystemView(metaClient, this, timeline, viewConfig.isIncrementalTimelineSyncEnabled()); + return new HoodieMetadataFileSystemView(metaClient, this.metadata(), timeline, + viewConfig.isIncrementalTimelineSyncEnabled()); } else { return getViewManager().getFileSystemView(metaClient); } diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java index b200d77f6ed2d..edd83594c1396 100644 --- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java +++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.FileSlice; @@ -58,7 +59,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; -import org.apache.hudi.config.HoodieMetadataConfig; import org.apache.hudi.config.HoodieMetricsConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java similarity index 94% rename from hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java rename to hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 53142b144e9dd..cc6d89d087765 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -16,9 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.config; - -import org.apache.hudi.common.config.DefaultHoodieConfig; +package org.apache.hudi.common.config; import javax.annotation.concurrent.Immutable; @@ -31,13 +29,16 @@ * Configurations used by the HUDI Metadata Table. */ @Immutable -public class HoodieMetadataConfig extends DefaultHoodieConfig { +public final class HoodieMetadataConfig extends DefaultHoodieConfig { public static final String METADATA_PREFIX = "hoodie.metadata"; // Enable the internal Metadata Table which saves file listings public static final String METADATA_ENABLE_PROP = METADATA_PREFIX + ".enable"; public static final boolean DEFAULT_METADATA_ENABLE = false; + // We can set the default to true for readers, as it will internally default to listing from filesystem if metadata + // table is not found + public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = true; // Validate contents of Metadata Table on each access against the actual filesystem public static final String METADATA_VALIDATE_PROP = METADATA_PREFIX + ".validate"; @@ -74,7 +75,6 @@ public static HoodieMetadataConfig.Builder newBuilder() { } public static class Builder { - private final Properties props = new Properties(); public Builder fromFile(File propertiesFile) throws IOException { @@ -147,5 +147,4 @@ public HoodieMetadataConfig build() { return config; } } - } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index d31018123c3a5..b6b4859b991b1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -158,6 +158,21 @@ private static HoodieTableFileSystemView createInMemoryFileSystemView(Serializab return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled()); } + public static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieTableMetaClient metaClient, + boolean useFileListingFromMetadata, + boolean verifyListings) { + LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath()); + if (useFileListingFromMetadata) { + return new HoodieMetadataFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), + true, + verifyListings); + } + + return new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); + } + /** * Create a remote file System view for a table. * diff --git a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieMetadataFileSystemView.java similarity index 66% rename from hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java rename to hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieMetadataFileSystemView.java index 8c23ea830f2ff..76c827f4cb6c2 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieMetadataFileSystemView.java @@ -16,27 +16,37 @@ * limitations under the License. */ -package org.apache.hudi.metadata; - -import java.io.IOException; +package org.apache.hudi.common.table.view; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.metadata.HoodieTableMetadata; + +import java.io.IOException; /** * {@code HoodieTableFileSystemView} implementation that retrieved partition listings from the Metadata Table. */ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView { - private HoodieTable hoodieTable; - public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, HoodieTable table, + private final HoodieTableMetadata tableMetadata; + + public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, HoodieTableMetadata tableMetadata, HoodieTimeline visibleActiveTimeline, boolean enableIncrementalTimelineSync) { super(metaClient, visibleActiveTimeline, enableIncrementalTimelineSync); - this.hoodieTable = table; + this.tableMetadata = tableMetadata; + } + + public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, + HoodieTimeline visibleActiveTimeline, + boolean useFileListingFromMetadata, + boolean verifyListings) { + super(metaClient, visibleActiveTimeline); + this.tableMetadata = HoodieTableMetadata.create(metaClient.getHadoopConf(), metaClient.getBasePath(), + FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR, useFileListingFromMetadata, verifyListings, + false, false); } /** @@ -47,6 +57,6 @@ public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, HoodieTabl */ @Override protected FileStatus[] listPartition(Path partitionPath) throws IOException { - return hoodieTable.metadata().getAllFilesInPartition(partitionPath); + return tableMetadata.getAllFilesInPartition(partitionPath); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 6cc6144c337ae..ca7c27a449101 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -85,7 +85,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { private final String spillableMapDirectory; // Readers for the base and log file which store the metadata - private transient HoodieFileReader basefileReader; + private transient HoodieFileReader baseFileReader; private transient HoodieMetadataMergedLogRecordScanner logRecordScanner; public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory, @@ -287,9 +287,9 @@ private Option> getMergedRecordByKey(String // Retrieve record from base file HoodieRecord hoodieRecord = null; - if (basefileReader != null) { + if (baseFileReader != null) { HoodieTimer timer = new HoodieTimer().startTimer(); - Option baseRecord = basefileReader.getRecordByKey(key); + Option baseRecord = baseFileReader.getRecordByKey(key); if (baseRecord.isPresent()) { hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metaClient.getTableConfig().getPayloadClass()); @@ -338,7 +338,7 @@ private synchronized void openBaseAndLogFiles() throws IOException { Option basefile = latestSlices.get(0).getBaseFile(); if (basefile.isPresent()) { String basefilePath = basefile.get().getPath(); - basefileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath)); + baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath)); LOG.info("Opened metadata base file from " + basefilePath + " at instant " + basefile.get().getCommitTime()); } @@ -365,9 +365,9 @@ private synchronized void openBaseAndLogFiles() throws IOException { } protected void closeReaders() { - if (basefileReader != null) { - basefileReader.close(); - basefileReader = null; + if (baseFileReader != null) { + baseFileReader.close(); + baseFileReader = null; } logRecordScanner = null; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index 1e616f896bb30..78606f4652894 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -22,9 +22,11 @@ import java.util.Set; import org.apache.hadoop.conf.Configurable; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; @@ -43,6 +45,11 @@ import java.util.List; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; +import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE; +import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP; +import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP; + /** * Given a path is a part of - Hoodie table = accepts ONLY the latest version of each path - Non-Hoodie table = then * always accept @@ -163,9 +170,13 @@ public boolean accept(Path path) { metaClientCache.put(baseDir.toString(), metaClient); } - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), fs.listStatus(folder)); - List latestFiles = fsView.getLatestBaseFiles().collect(Collectors.toList()); + boolean useFileListingFromMetadata = getConf().getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS); + boolean verifyFileListing = getConf().getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE); + HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient, + useFileListingFromMetadata, verifyFileListing); + String partition = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), folder); + + List latestFiles = fsView.getLatestBaseFiles(partition).collect(Collectors.toList()); // populate the cache if (!hoodiePathCache.containsKey(folder.toString())) { hoodiePathCache.put(folder.toString(), new HashSet<>()); diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index c1e45c3593f49..52e268d73cd03 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -17,6 +17,7 @@ package org.apache.hudi.functional +import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.testutils.HoodieClientTestBase @@ -26,6 +27,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.functions.col import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.collection.JavaConversions._ @@ -71,13 +74,16 @@ class TestCOWDataSource extends HoodieClientTestBase { assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) } - @Test def testCopyOnWriteStorage() { + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testCopyOnWriteStorage(isMetadataEnabled: Boolean) { // Insert Operation val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") .options(commonOpts) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled) .mode(SaveMode.Overwrite) .save(basePath) @@ -85,7 +91,9 @@ class TestCOWDataSource extends HoodieClientTestBase { val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) // Snapshot query - val snapshotDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*") + val snapshotDF1 = spark.read.format("org.apache.hudi") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled) + .load(basePath + "/*/*/*/*") assertEquals(100, snapshotDF1.count()) val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList @@ -95,6 +103,7 @@ class TestCOWDataSource extends HoodieClientTestBase { // Upsert Operation inputDF2.write.format("org.apache.hudi") .options(commonOpts) + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled) .mode(SaveMode.Append) .save(basePath) @@ -103,6 +112,7 @@ class TestCOWDataSource extends HoodieClientTestBase { // Snapshot Query val snapshotDF2 = spark.read.format("org.apache.hudi") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled) .load(basePath + "/*/*/*/*") assertEquals(100, snapshotDF2.count()) // still 100, since we only updated @@ -124,6 +134,7 @@ class TestCOWDataSource extends HoodieClientTestBase { val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1)) emptyDF.write.format("org.apache.hudi") .options(commonOpts) + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled) .mode(SaveMode.Append) .save(basePath)