Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,16 @@ public void open(Configuration parameters) throws Exception {
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getMaxNumberOfParallelSubtasks(),
getRuntimeContext().getNumberOfParallelSubtasks(),
ignoreSmallFiles(writeConfig),
ignoreSmallFiles(),
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
context,
writeConfig);
this.payloadCreation = PayloadCreation.instance(this.conf);
}

private boolean ignoreSmallFiles(HoodieWriteConfig writeConfig) {
private boolean ignoreSmallFiles() {
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
return WriteOperationType.isOverwrite(operationType) || writeConfig.allowDuplicateInserts();
return WriteOperationType.isOverwrite(operationType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.partitioner.profile.WriteProfile;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.SmallFile;
Expand Down Expand Up @@ -188,6 +187,7 @@ private synchronized SmallFileAssign getSmallFileAssign(String partitionPath) {
smallFileAssignMap.put(partitionPath, assign);
return assign;
}
smallFileAssignMap.put(partitionPath, null);
return null;
}

Expand All @@ -211,10 +211,6 @@ public synchronized void reload(long checkpointId) {
this.writeProfile.reload(checkpointId);
}

public HoodieTable<?, ?, ?, ?> getTable() {
return this.writeProfile.getTable();
}

private boolean fileIdOfThisTask(String fileId) {
// the file id can shuffle to this task
return KeyGroupRangeAssignment.assignKeyToParallelOperator(fileId, maxParallelism, numTasks) == taskID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.AbstractTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.commit.SmallFile;

Expand Down Expand Up @@ -55,13 +55,11 @@ protected List<SmallFile> smallFilesProfile(String partitionPath) {
// Find out all eligible small file slices
if (!commitTimeline.empty()) {
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
// initialize the filesystem view based on the commit metadata
initFileSystemView();
// find smallest file in partition and append to it
// find the smallest file in partition and append to it
List<FileSlice> allSmallFileSlices = new ArrayList<>();
// If we can index log files, we can add more inserts to log files for fileIds including those under
// pending compaction.
List<FileSlice> allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
List<FileSlice> allFileSlices = getFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
.collect(Collectors.toList());
for (FileSlice fileSlice : allFileSlices) {
if (isSmallFile(fileSlice)) {
Expand Down Expand Up @@ -91,8 +89,8 @@ protected List<SmallFile> smallFilesProfile(String partitionPath) {
return smallFileLocations;
}

protected AbstractTableFileSystemView getFileSystemView() {
return (AbstractTableFileSystemView) this.table.getSliceView();
protected SyncableFileSystemView getFileSystemView() {
return (SyncableFileSystemView) this.table.getSliceView();
}

private long getTotalFileSize(FileSlice fileSlice) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.AbstractTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.partitioner.BucketAssigner;
import org.apache.hudi.table.HoodieFlinkTable;
Expand Down Expand Up @@ -91,11 +92,6 @@ public class WriteProfile {
*/
private long reloadedCheckpointId;

/**
* The file system view cache for one checkpoint interval.
*/
protected AbstractTableFileSystemView fsView;

/**
* Metadata cache to reduce IO of metadata files.
*/
Expand All @@ -120,8 +116,8 @@ public long getRecordsPerBucket() {
return recordsPerBucket;
}

public HoodieTable<?, ?, ?, ?> getTable() {
return table;
public HoodieTableMetaClient getMetaClient() {
return this.table.getMetaClient();
}

/**
Expand Down Expand Up @@ -183,9 +179,7 @@ protected List<SmallFile> smallFilesProfile(String partitionPath) {

if (!commitTimeline.empty()) { // if we have some commits
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
// initialize the filesystem view based on the commit metadata
initFileSystemView();
List<HoodieBaseFile> allFiles = fsView
List<HoodieBaseFile> allFiles = getFileSystemView()
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());

for (HoodieBaseFile file : allFiles) {
Expand All @@ -203,15 +197,8 @@ protected List<SmallFile> smallFilesProfile(String partitionPath) {
return smallFileLocations;
}

@VisibleForTesting
public void initFileSystemView() {
if (fsView == null) {
fsView = getFileSystemView();
}
}

protected AbstractTableFileSystemView getFileSystemView() {
return (AbstractTableFileSystemView) this.table.getBaseFileOnlyView();
protected SyncableFileSystemView getFileSystemView() {
return (SyncableFileSystemView) HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) table.getContext()).getBaseFileOnlyView();
}

/**
Expand Down Expand Up @@ -245,9 +232,7 @@ public synchronized void reload(long checkpointId) {
return;
}
this.table.getMetaClient().reloadActiveTimeline();
this.table.getHoodieView().sync();
recordProfile();
this.fsView = null;
cleanMetadataCache(this.table.getMetaClient().getCommitsTimeline().filterCompletedInstants().getInstants());
this.smallFilesMap.clear();
this.reloadedCheckpointId = checkpointId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ public static HoodieTableMetaClient createMetaClient(Configuration conf) {
*
* <p>This expects to be used by client, the driver should start an embedded timeline server.
*/
@SuppressWarnings("rawtypes")
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) {
HoodieFlinkEngineContext context =
new HoodieFlinkEngineContext(
Expand All @@ -382,17 +383,20 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf, Runti
*
* <p>The task context supplier is a constant: the write token is always '0-1-0'.
*/
@SuppressWarnings("rawtypes")
public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException {
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
// build the write client to start the embedded timeline server
final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
// create the filesystem view storage properties for client
FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig();
final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig();
// rebuild the view storage config with simplified options.
FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder()
.withStorageType(viewStorageConfig.getStorageType())
.withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build();
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt);
return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
return writeClient;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public void testWriteProfileMetadataCache() throws Exception {
}

private static String getLastCompleteInstant(WriteProfile profile) {
return StreamerUtil.getLastCompletedInstant(profile.getTable().getMetaClient());
return StreamerUtil.getLastCompletedInstant(profile.getMetaClient());
}

private void assertBucketEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.hudi.utils;

import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.util.ViewStorageProperties;

import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -98,5 +101,13 @@ void testInstantTimeDiff() {
long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower);
assertThat(diff, is(75L));
}

@Test
void testDumpRemoteViewStorageConfig() throws IOException {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
StreamerUtil.createWriteClient(conf);
FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH));
assertThat(storageConfig.getStorageType(), is(FileSystemViewStorageType.REMOTE_FIRST));
}
}