diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 33f1dd6204bc..73fd6685539f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -121,16 +121,16 @@ public void open(Configuration parameters) throws Exception { getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getMaxNumberOfParallelSubtasks(), getRuntimeContext().getNumberOfParallelSubtasks(), - ignoreSmallFiles(writeConfig), + ignoreSmallFiles(), HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)), context, writeConfig); this.payloadCreation = PayloadCreation.instance(this.conf); } - private boolean ignoreSmallFiles(HoodieWriteConfig writeConfig) { + private boolean ignoreSmallFiles() { WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); - return WriteOperationType.isOverwrite(operationType) || writeConfig.allowDuplicateInserts(); + return WriteOperationType.isOverwrite(operationType); } @Override diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java index f9d5b1c1faa9..e73890f3b54b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java @@ -22,7 +22,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sink.partitioner.profile.WriteProfile; import org.apache.hudi.sink.partitioner.profile.WriteProfiles; -import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.table.action.commit.SmallFile; @@ -188,6 +187,7 @@ private synchronized SmallFileAssign getSmallFileAssign(String partitionPath) { smallFileAssignMap.put(partitionPath, assign); return assign; } + smallFileAssignMap.put(partitionPath, null); return null; } @@ -211,10 +211,6 @@ public synchronized void reload(long checkpointId) { this.writeProfile.reload(checkpointId); } - public HoodieTable getTable() { - return this.writeProfile.getTable(); - } - private boolean fileIdOfThisTask(String fileId) { // the file id can shuffle to this task return KeyGroupRangeAssignment.assignKeyToParallelOperator(fileId, maxParallelism, numTasks) == taskID; diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index 922c056d259d..1f08c5a4a5f2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.AbstractTableFileSystemView; +import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.action.commit.SmallFile; @@ -55,13 +55,11 @@ protected List smallFilesProfile(String partitionPath) { // Find out all eligible small file slices if (!commitTimeline.empty()) { HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); - // initialize the filesystem view based on the commit metadata - initFileSystemView(); - // find smallest file in partition and append to it + // find the smallest file in partition and append to it List allSmallFileSlices = new ArrayList<>(); // If we can index log files, we can add more inserts to log files for fileIds including those under // pending compaction. - List allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) + List allFileSlices = getFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) .collect(Collectors.toList()); for (FileSlice fileSlice : allFileSlices) { if (isSmallFile(fileSlice)) { @@ -91,8 +89,8 @@ protected List smallFilesProfile(String partitionPath) { return smallFileLocations; } - protected AbstractTableFileSystemView getFileSystemView() { - return (AbstractTableFileSystemView) this.table.getSliceView(); + protected SyncableFileSystemView getFileSystemView() { + return (SyncableFileSystemView) this.table.getSliceView(); } private long getTotalFileSize(FileSlice fileSlice) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index 1171a54cde92..98eb29cdeb4b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -23,9 +23,10 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.AbstractTableFileSystemView; +import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sink.partitioner.BucketAssigner; import org.apache.hudi.table.HoodieFlinkTable; @@ -91,11 +92,6 @@ public class WriteProfile { */ private long reloadedCheckpointId; - /** - * The file system view cache for one checkpoint interval. - */ - protected AbstractTableFileSystemView fsView; - /** * Metadata cache to reduce IO of metadata files. */ @@ -120,8 +116,8 @@ public long getRecordsPerBucket() { return recordsPerBucket; } - public HoodieTable getTable() { - return table; + public HoodieTableMetaClient getMetaClient() { + return this.table.getMetaClient(); } /** @@ -183,9 +179,7 @@ protected List smallFilesProfile(String partitionPath) { if (!commitTimeline.empty()) { // if we have some commits HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); - // initialize the filesystem view based on the commit metadata - initFileSystemView(); - List allFiles = fsView + List allFiles = getFileSystemView() .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); for (HoodieBaseFile file : allFiles) { @@ -203,15 +197,8 @@ protected List smallFilesProfile(String partitionPath) { return smallFileLocations; } - @VisibleForTesting - public void initFileSystemView() { - if (fsView == null) { - fsView = getFileSystemView(); - } - } - - protected AbstractTableFileSystemView getFileSystemView() { - return (AbstractTableFileSystemView) this.table.getBaseFileOnlyView(); + protected SyncableFileSystemView getFileSystemView() { + return (SyncableFileSystemView) HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) table.getContext()).getBaseFileOnlyView(); } /** @@ -245,9 +232,7 @@ public synchronized void reload(long checkpointId) { return; } this.table.getMetaClient().reloadActiveTimeline(); - this.table.getHoodieView().sync(); recordProfile(); - this.fsView = null; cleanMetadataCache(this.table.getMetaClient().getCommitsTimeline().filterCompletedInstants().getInstants()); this.smallFilesMap.clear(); this.reloadedCheckpointId = checkpointId; diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 28a70b95f3ef..1e39c92e304e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -365,6 +365,7 @@ public static HoodieTableMetaClient createMetaClient(Configuration conf) { * *

This expects to be used by client, the driver should start an embedded timeline server. */ + @SuppressWarnings("rawtypes") public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) { HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( @@ -382,17 +383,20 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf, Runti * *

The task context supplier is a constant: the write token is always '0-1-0'. */ + @SuppressWarnings("rawtypes") public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException { HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false); + // build the write client to start the embedded timeline server + final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); // create the filesystem view storage properties for client - FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig(); + final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig(); // rebuild the view storage config with simplified options. FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder() .withStorageType(viewStorageConfig.getStorageType()) .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost()) .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build(); ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt); - return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); + return writeClient; } /** diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 053c2a39c8e0..4f4b5499530c 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -401,7 +401,7 @@ public void testWriteProfileMetadataCache() throws Exception { } private static String getLastCompleteInstant(WriteProfile profile) { - return StreamerUtil.getLastCompletedInstant(profile.getTable().getMetaClient()); + return StreamerUtil.getLastCompletedInstant(profile.getMetaClient()); } private void assertBucketEquals( diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index e00fbfac50ce..57297c50ee82 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -19,9 +19,12 @@ package org.apache.hudi.utils; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.ViewStorageProperties; import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; @@ -98,5 +101,13 @@ void testInstantTimeDiff() { long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower); assertThat(diff, is(75L)); } + + @Test + void testDumpRemoteViewStorageConfig() throws IOException { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + StreamerUtil.createWriteClient(conf); + FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH)); + assertThat(storageConfig.getStorageType(), is(FileSystemViewStorageType.REMOTE_FIRST)); + } }