Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,24 @@
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.StreamerUtil;

import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
Expand All @@ -54,11 +58,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import static java.util.stream.Collectors.toList;

/**
* The function to load index from existing hoodieTable.
*
Expand All @@ -78,6 +83,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
private final Configuration conf;

private transient org.apache.hadoop.conf.Configuration hadoopConf;
private transient HoodieWriteConfig writeConfig;

private GlobalAggregateManager aggregateManager;
private ListState<Boolean> bootstrapState;
Expand Down Expand Up @@ -108,13 +114,14 @@ public void initializeState(FunctionInitializationContext context) throws Except
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.hadoopConf = StreamerUtil.getHadoopConf();
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
this.hoodieTable = getTable();
this.aggregateManager = ((StreamingRuntimeContext) getRuntimeContext()).getGlobalAggregateManager();
}

@Override
@SuppressWarnings("unchecked")
public void processElement(I value, Context ctx, Collector<O> out) throws IOException {
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
if (!alreadyBootstrap) {
String basePath = hoodieTable.getMetaClient().getBasePath();
int taskID = getRuntimeContext().getIndexOfThisSubtask();
Expand Down Expand Up @@ -155,11 +162,10 @@ private void waitForBootstrapReady(int taskID) {
}

private HoodieFlinkTable getTable() {
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
new SerializableConfiguration(this.hadoopConf),
new FlinkTaskContextSupplier(getRuntimeContext()));
return HoodieFlinkTable.create(writeConfig, context);
return HoodieFlinkTable.create(this.writeConfig, context);
}

/**
Expand All @@ -168,32 +174,64 @@ private HoodieFlinkTable getTable() {
* @param partitionPath The partition path
*/
@SuppressWarnings("unchecked")
private void loadRecords(String partitionPath, Collector<O> out) {
private void loadRecords(String partitionPath, Collector<O> out) throws Exception {
long start = System.currentTimeMillis();

BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
List<HoodieBaseFile> latestBaseFiles =
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, this.hoodieTable);
LOG.info("All baseFile in partition {} size = {}", partitionPath, latestBaseFiles.size());
Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();

final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
for (HoodieBaseFile baseFile : latestBaseFiles) {
boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator(
baseFile.getFileId(), maxParallelism, parallelism) == taskID;

if (shouldLoad) {
LOG.info("Load records from file {}.", baseFile);
final List<HoodieKey> hoodieKeys;
try {
hoodieKeys =
fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath()));
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e);
Option<HoodieInstant> latestCommitTime = this.hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();

if (latestCommitTime.isPresent()) {
List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp(), true)
.collect(toList());

for (FileSlice fileSlice : fileSlices) {
if (!shouldLoadFile(fileSlice.getFileId(), maxParallelism, parallelism, taskID)) {
continue;
}
LOG.info("Load records from {}.", fileSlice);

// load parquet records
fileSlice.getBaseFile().ifPresent(baseFile -> {
// filter out crushed files
if (baseFile.getFileSize() <= 0) {
return;
}

final List<HoodieKey> hoodieKeys;
try {
hoodieKeys =
fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath()));
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e);
}

for (HoodieKey hoodieKey : hoodieKeys) {
out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice)));
}
});

// load avro log records
List<String> logPaths = fileSlice.getLogFiles()
// filter out crushed files
.filter(logFile -> logFile.getFileSize() > 0)
.map(logFile -> logFile.getPath().toString())
.collect(toList());
HoodieMergedLogRecordScanner scanner = scanLog(logPaths, schema, latestCommitTime.get().getTimestamp());

for (HoodieKey hoodieKey : hoodieKeys) {
out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, baseFile)));
try {
for (String recordKey : scanner.getRecords().keySet()) {
out.collect((O) new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice)));
}
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
}
}
}
Expand All @@ -203,17 +241,44 @@ private void loadRecords(String partitionPath, Collector<O> out) {
this.getClass().getSimpleName(), taskID, partitionPath, cost);
}

private HoodieMergedLogRecordScanner scanLog(
List<String> logPaths,
Schema logSchema,
String latestInstantTime) {
String basePath = this.hoodieTable.getMetaClient().getBasePath();
return HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(FSUtils.getFs(basePath, this.hadoopConf))
.withBasePath(basePath)
.withLogFilePaths(logPaths)
.withReaderSchema(logSchema)
.withLatestInstantTime(latestInstantTime)
.withReadBlocksLazily(this.writeConfig.getCompactionLazyBlockReadEnabled())
.withReverseReader(false)
.withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize())
.withMaxMemorySizeInBytes(this.writeConfig.getMaxMemoryPerPartitionMerge())
.withSpillableMapBasePath(this.writeConfig.getSpillableMapBasePath())
.build();
}

@SuppressWarnings("unchecked")
public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, HoodieBaseFile baseFile) {
public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, FileSlice fileSlice) {
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, null);
hoodieRecord.setCurrentLocation(new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), baseFile.getCommitTime(), baseFile.getFileId()));
hoodieRecord.setCurrentLocation(new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId()));
hoodieRecord.seal();

return hoodieRecord;
}

private static boolean shouldLoadFile(String fileId,
int maxParallelism,
int parallelism,
int taskID) {
return KeyGroupRangeAssignment.assignKeyToParallelOperator(
fileId, maxParallelism, parallelism) == taskID;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
public void notifyCheckpointComplete(long checkpointId) {
// no operation
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ public void testInsertWithSmallBufferSize() throws Exception {
checkWrittenData(tempFile, expected, 1);
}

Map<String, String> getMiniBatchExpected() {
protected Map<String, String> getMiniBatchExpected() {
Map<String, String> expected = new HashMap<>();
// the last 2 lines are merged
expected.put("par1", "["
Expand All @@ -581,6 +581,10 @@ Map<String, String> getMiniBatchExpected() {
return expected;
}

protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protected

}

@Test
public void testIndexStateBootstrap() throws Exception {
// open the function and ingest data
Expand Down Expand Up @@ -637,7 +641,9 @@ public void testIndexStateBootstrap() throws Exception {

nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
checkWrittenData(tempFile, EXPECTED2);

Map<String, String> expected = getExpectedBeforeCheckpointComplete();
checkWrittenData(tempFile, expected);

funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.util.Comparator;
Expand Down Expand Up @@ -83,13 +81,12 @@ protected void checkWrittenData(File baseFile, Map<String, String> expected, int
TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema);
}

@Disabled
@Test
public void testIndexStateBootstrap() {
// Ignore the index bootstrap because we only support parquet load now.
@Override
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED1;
}

Map<String, String> getMiniBatchExpected() {
protected Map<String, String> getMiniBatchExpected() {
Map<String, String> expected = new HashMap<>();
// MOR mode merges the messages with the same key.
expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testIndexStateBootstrap() {
// Ignore the index bootstrap because we only support parquet load now.
}

Map<String, String> getMiniBatchExpected() {
protected Map<String, String> getMiniBatchExpected() {
Map<String, String> expected = new HashMap<>();
// MOR mode merges the messages with the same key.
expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
Expand Down