Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieMetadataConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
Expand Down Expand Up @@ -52,7 +53,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMetadataConfig;
import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieMetadataFileSystemView;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
Expand All @@ -62,7 +63,6 @@
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
Expand Down Expand Up @@ -271,7 +271,8 @@ public SyncableFileSystemView getHoodieView() {
private SyncableFileSystemView getFileSystemViewInternal(HoodieTimeline timeline) {
if (config.useFileListingMetadata()) {
FileSystemViewStorageConfig viewConfig = config.getViewStorageConfig();
return new HoodieMetadataFileSystemView(metaClient, this, timeline, viewConfig.isIncrementalTimelineSyncEnabled());
return new HoodieMetadataFileSystemView(metaClient, this.metadata(), timeline,
viewConfig.isIncrementalTimelineSyncEnabled());
} else {
return getViewManager().getFileSystemView(metaClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
Expand All @@ -58,7 +59,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieMetadataConfig;
import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
* limitations under the License.
*/

package org.apache.hudi.config;

import org.apache.hudi.common.config.DefaultHoodieConfig;
package org.apache.hudi.common.config;

import javax.annotation.concurrent.Immutable;

Expand All @@ -31,13 +29,16 @@
* Configurations used by the HUDI Metadata Table.
*/
@Immutable
public class HoodieMetadataConfig extends DefaultHoodieConfig {
public final class HoodieMetadataConfig extends DefaultHoodieConfig {

public static final String METADATA_PREFIX = "hoodie.metadata";

// Enable the internal Metadata Table which saves file listings
public static final String METADATA_ENABLE_PROP = METADATA_PREFIX + ".enable";
public static final boolean DEFAULT_METADATA_ENABLE = false;
// We can set the default to true for readers, as it will internally default to listing from filesystem if metadata
// table is not found
public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = true;

// Validate contents of Metadata Table on each access against the actual filesystem
public static final String METADATA_VALIDATE_PROP = METADATA_PREFIX + ".validate";
Expand Down Expand Up @@ -74,7 +75,6 @@ public static HoodieMetadataConfig.Builder newBuilder() {
}

public static class Builder {

private final Properties props = new Properties();

public Builder fromFile(File propertiesFile) throws IOException {
Expand Down Expand Up @@ -147,5 +147,4 @@ public HoodieMetadataConfig build() {
return config;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,21 @@ private static HoodieTableFileSystemView createInMemoryFileSystemView(Serializab
return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());
}

public static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieTableMetaClient metaClient,
boolean useFileListingFromMetadata,
boolean verifyListings) {
LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath());
if (useFileListingFromMetadata) {
return new HoodieMetadataFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
true,
verifyListings);
}

return new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
}

/**
* Create a remote file System view for a table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,37 @@
* limitations under the License.
*/

package org.apache.hudi.metadata;

import java.io.IOException;
package org.apache.hudi.common.table.view;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.metadata.HoodieTableMetadata;

import java.io.IOException;

/**
* {@code HoodieTableFileSystemView} implementation that retrieved partition listings from the Metadata Table.
*/
public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView {
private HoodieTable hoodieTable;

public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, HoodieTable table,
private final HoodieTableMetadata tableMetadata;

public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, HoodieTableMetadata tableMetadata,
HoodieTimeline visibleActiveTimeline, boolean enableIncrementalTimelineSync) {
super(metaClient, visibleActiveTimeline, enableIncrementalTimelineSync);
this.hoodieTable = table;
this.tableMetadata = tableMetadata;
}

public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient,
HoodieTimeline visibleActiveTimeline,
boolean useFileListingFromMetadata,
boolean verifyListings) {
super(metaClient, visibleActiveTimeline);
this.tableMetadata = HoodieTableMetadata.create(metaClient.getHadoopConf(), metaClient.getBasePath(),
FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR, useFileListingFromMetadata, verifyListings,
false, false);
}

/**
Expand All @@ -47,6 +57,6 @@ public HoodieMetadataFileSystemView(HoodieTableMetaClient metaClient, HoodieTabl
*/
@Override
protected FileStatus[] listPartition(Path partitionPath) throws IOException {
return hoodieTable.metadata().getAllFilesInPartition(partitionPath);
return tableMetadata.getAllFilesInPartition(partitionPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata {
private final String spillableMapDirectory;

// Readers for the base and log file which store the metadata
private transient HoodieFileReader<GenericRecord> basefileReader;
private transient HoodieFileReader<GenericRecord> baseFileReader;
private transient HoodieMetadataMergedLogRecordScanner logRecordScanner;

public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory,
Expand Down Expand Up @@ -287,9 +287,9 @@ private Option<HoodieRecord<HoodieMetadataPayload>> getMergedRecordByKey(String

// Retrieve record from base file
HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
if (basefileReader != null) {
if (baseFileReader != null) {
HoodieTimer timer = new HoodieTimer().startTimer();
Option<GenericRecord> baseRecord = basefileReader.getRecordByKey(key);
Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
if (baseRecord.isPresent()) {
hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
metaClient.getTableConfig().getPayloadClass());
Expand Down Expand Up @@ -338,7 +338,7 @@ private synchronized void openBaseAndLogFiles() throws IOException {
Option<HoodieBaseFile> basefile = latestSlices.get(0).getBaseFile();
if (basefile.isPresent()) {
String basefilePath = basefile.get().getPath();
basefileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
LOG.info("Opened metadata base file from " + basefilePath + " at instant " + basefile.get().getCommitTime());
}

Expand All @@ -365,9 +365,9 @@ private synchronized void openBaseAndLogFiles() throws IOException {
}

protected void closeReaders() {
if (basefileReader != null) {
basefileReader.close();
basefileReader = null;
if (baseFileReader != null) {
baseFileReader.close();
baseFileReader = null;
}
logRecordScanner = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.util.Set;
import org.apache.hadoop.conf.Configurable;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
Expand All @@ -43,6 +45,11 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;

/**
* Given a path is a part of - Hoodie table = accepts ONLY the latest version of each path - Non-Hoodie table = then
* always accept
Expand Down Expand Up @@ -163,9 +170,13 @@ public boolean accept(Path path) {
metaClientCache.put(baseDir.toString(), metaClient);
}

HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), fs.listStatus(folder));
List<HoodieBaseFile> latestFiles = fsView.getLatestBaseFiles().collect(Collectors.toList());
boolean useFileListingFromMetadata = getConf().getBoolean(METADATA_ENABLE_PROP, DEFAULT_METADATA_ENABLE_FOR_READERS);
boolean verifyFileListing = getConf().getBoolean(METADATA_VALIDATE_PROP, DEFAULT_METADATA_VALIDATE);
HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(metaClient,
useFileListingFromMetadata, verifyFileListing);
String partition = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), folder);

List<HoodieBaseFile> latestFiles = fsView.getLatestBaseFiles(partition).collect(Collectors.toList());
// populate the cache
if (!hoodiePathCache.containsKey(folder.toString())) {
hoodiePathCache.put(folder.toString(), new HashSet<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hudi.functional

import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.HoodieClientTestBase
Expand All @@ -26,6 +27,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.functions.col
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -71,21 +74,26 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
}

@Test def testCopyOnWriteStorage() {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testCopyOnWriteStorage(isMetadataEnabled: Boolean) {
// Insert Operation
val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
.mode(SaveMode.Overwrite)
.save(basePath)

assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)

// Snapshot query
val snapshotDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")
val snapshotDF1 = spark.read.format("org.apache.hudi")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
.load(basePath + "/*/*/*/*")
assertEquals(100, snapshotDF1.count())

val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
Expand All @@ -95,6 +103,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
// Upsert Operation
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
.mode(SaveMode.Append)
.save(basePath)

Expand All @@ -103,6 +112,7 @@ class TestCOWDataSource extends HoodieClientTestBase {

// Snapshot Query
val snapshotDF2 = spark.read.format("org.apache.hudi")
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
.load(basePath + "/*/*/*/*")
assertEquals(100, snapshotDF2.count()) // still 100, since we only updated

Expand All @@ -124,6 +134,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
emptyDF.write.format("org.apache.hudi")
.options(commonOpts)
.option(HoodieMetadataConfig.METADATA_ENABLE_PROP, isMetadataEnabled)
.mode(SaveMode.Append)
.save(basePath)

Expand Down