Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.table.HoodieTable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static java.util.stream.Collectors.toList;
Expand All @@ -37,6 +37,27 @@
*/
public class HoodieIndexUtils {

/**
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
*
* @param partition Partition of interest
* @param context Instance of {@link HoodieEngineContext} to use
* @param hoodieTable Instance of {@link HoodieTable} of interest
* @return the list of {@link HoodieBaseFile}
*/
public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use this in getLatestBaseFilesForAllPartitions to avoid duplicate codes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good that getLatestBaseFilesForPartition was extracted from getLatestBaseFilesForAllPartitions.

Current codebase:

  public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
      final String partition,
      final HoodieTable hoodieTable) {
    Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
        .filterCompletedInstants().lastInstant();
    if (latestCommitTime.isPresent()) {
      return hoodieTable.getBaseFileOnlyView()
          .getLatestBaseFilesBeforeOrOn(partition, latestCommitTime.get().getTimestamp())
          .collect(toList());
    }
    return Collections.emptyList();
  }

Maybe the following implementation is more efficient

  public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
      final String partition,
      final HoodieTable hoodieTable) {
    return hoodieTable.getFileSystemView()
        .getAllFileGroups(partition)
        .map(HoodieFileGroup::getLatestDataFile)
        .filter(Option::isPresent)
        .map(Option::get)
        .collect(toList());
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refer to SparkHoodieBackedTableMetadataWriter#prepRecords

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, there is no need to decide and compare the instant time here, but i would not promote it in this PR, because it is not related.

You can promote it in a separate JIRA issue.

final String partition,
final HoodieTable hoodieTable) {
Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
if (latestCommitTime.isPresent()) {
return hoodieTable.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(partition, latestCommitTime.get().getTimestamp())
.collect(toList());
}
return Collections.emptyList();
}

/**
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
*
Expand All @@ -50,15 +71,11 @@ public static List<Pair<String, HoodieBaseFile>> getLatestBaseFilesForAllPartiti
final HoodieTable hoodieTable) {
context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions");
return context.flatMap(partitions, partitionPath -> {
Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
List<Pair<String, HoodieBaseFile>> filteredFiles = new ArrayList<>();
if (latestCommitTime.isPresent()) {
filteredFiles = hoodieTable.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
.map(f -> Pair.of(partitionPath, f))
.collect(toList());
}
List<Pair<String, HoodieBaseFile>> filteredFiles =
getLatestBaseFilesForPartition(partitionPath, hoodieTable).stream()
.map(baseFile -> Pair.of(partitionPath, baseFile))
.collect(toList());

return filteredFiles.stream();
}, Math.max(partitions.size(), 1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.codehaus.jackson.node.NullNode;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -235,9 +234,9 @@ public static Schema getRecordKeyPartitionPathSchema() {
Schema recordSchema = Schema.createRecord("HoodieRecordKey", "", "", false);

Schema.Field recordKeyField =
new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
Schema.Field partitionPathField =
new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);

toBeAddedFields.add(recordKeyField);
toBeAddedFields.add(partitionPathField);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,41 @@
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.operator.FlinkOptions;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.stream.Collectors;

/**
* The function to build the write profile incrementally for records within a checkpoint,
Expand All @@ -48,8 +64,8 @@
* <p>All the records are tagged with HoodieRecordLocation, instead of real instant time,
* INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep
* the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides
* where the record should write to. The "I" and "U" tag is only used for downstream to decide whether
* the data bucket is a INSERT or a UPSERT, we should factor the it out when the underneath writer
* where the record should write to. The "I" and "U" tags are only used for downstream to decide whether
* the data bucket is an INSERT or an UPSERT, we should factor the tags out when the underneath writer
* supports specifying the bucket type explicitly.
*
* <p>The output records should then shuffle by the bucket ID and thus do scalable write.
Expand All @@ -60,14 +76,51 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
extends KeyedProcessFunction<K, I, O>
implements CheckpointedFunction, CheckpointListener {

private static final Logger LOG = LoggerFactory.getLogger(BucketAssignFunction.class);

private HoodieFlinkEngineContext context;

/**
* Index cache(speed-up) state for the underneath file based(BloomFilter) indices.
* When a record came in, we do these check:
*
* <ul>
* <li>Try to load all the records in the partition path where the record belongs to</li>
* <li>Checks whether the state contains the record key</li>
* <li>If it does, tag the record with the location</li>
* <li>If it does not, use the {@link BucketAssigner} to generate a new bucket ID</li>
* </ul>
*/
private MapState<HoodieKey, HoodieRecordLocation> indexState;

/**
* Bucket assigner to assign new bucket IDs or reuse existing ones.
*/
private BucketAssigner bucketAssigner;

private final Configuration conf;

private transient org.apache.hadoop.conf.Configuration hadoopConf;

private final boolean isChangingRecords;

/**
* All the partition paths when the task starts. It is used to help checking whether all the partitions
* are loaded into the state.
*/
private transient List<String> initialPartitionsToLoad;

/**
* State to book-keep which partition is loaded into the index state {@code indexState}.
*/
private MapState<String, Integer> partitionLoadState;

/**
* Whether all partitions are loaded, if it is true,
* we can only check the state for locations.
*/
private boolean allPartitionsLoaded = false;

public BucketAssignFunction(Configuration conf) {
this.conf = conf;
this.isChangingRecords = WriteOperationType.isChangingRecords(
Expand All @@ -78,13 +131,20 @@ public BucketAssignFunction(Configuration conf) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
HoodieFlinkEngineContext context =
new HoodieFlinkEngineContext(
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = new BucketAssigner(
context,
writeConfig);
this.hadoopConf = StreamerUtil.getHadoopConf();
this.context = new HoodieFlinkEngineContext(
new SerializableConfiguration(this.hadoopConf),
new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = new BucketAssigner(context, writeConfig);
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
this.conf.getString(FlinkOptions.PATH), false, false, false);
final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
// reference: org.apache.flink.streaming.api.datastream.KeyedStream
this.initialPartitionsToLoad = allPartitionPaths.stream()
Copy link
Contributor

@hk-lrzy hk-lrzy Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    if (context.isRestored()) {
      checkPartitionsLoaded();
    }

when restored from checkpoint, initialPartitionsToLoad has not initialized yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes welcome to fire a fix and add test cases

.filter(partition -> KeyGroupRangeAssignment.assignKeyToParallelOperator(partition, maxParallelism, parallelism) == taskID)
.collect(Collectors.toList());
}

@Override
Expand All @@ -100,6 +160,12 @@ public void initializeState(FunctionInitializationContext context) {
TypeInformation.of(HoodieKey.class),
TypeInformation.of(HoodieRecordLocation.class));
indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
MapStateDescriptor<String, Integer> partitionLoadStateDesc =
new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc);
if (context.isRestored()) {
checkPartitionsLoaded();
}
}

@SuppressWarnings("unchecked")
Expand All @@ -112,6 +178,10 @@ public void processElement(I value, Context ctx, Collector<O> out) throws Except
final HoodieKey hoodieKey = record.getKey();
final BucketInfo bucketInfo;
final HoodieRecordLocation location;
if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allPartitionsLoaded member variable seems redundant, can we only use partitionLoadState?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, allPartitionsLoaded flag is used to speed up so that there is no need to query the state which is not very efficient.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here, use &&, the second statement will always be executed.

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by the way, allPartitionPath only inited in BucketAssignFunction#open method, seems also need to update allPartitionPath?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need to ensure the initial partitions are loaded successfully, the new input data would trigger index update if there are new data partitions.

// If the partition records are never loaded, load the records first.
loadRecords(hoodieKey.getPartitionPath());
}
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
if (isChangingRecords && this.indexState.contains(hoodieKey)) {
Expand Down Expand Up @@ -146,5 +216,69 @@ public void notifyCheckpointComplete(long l) {
// Refresh the table state when there are new commits.
this.bucketAssigner.reset();
this.bucketAssigner.refreshTable();
checkPartitionsLoaded();
}

/**
* Load all the indices of give partition path into the backup state.
*
* @param partitionPath The partition path
* @throws Exception when error occurs for state update
*/
private void loadRecords(String partitionPath) throws Exception {
HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
List<HoodieBaseFile> latestBaseFiles =
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable);
for (HoodieBaseFile baseFile : latestBaseFiles) {
List<HoodieKey> hoodieKeys =
ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
hoodieKeys.forEach(hoodieKey -> {
try {
this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
} catch (Exception e) {
throw new HoodieIOException("Error when load record keys from file: " + baseFile);
}
});
}
// Mark the partition path as loaded.
partitionLoadState.put(partitionPath, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0, It is better to use static constants instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is okey because only one code snippet uses it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 0 is meaningless here, It's may not intuitive for beginners.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe put a boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 0 is meaningless here

It is meaningless anyway, because Flink does not have Set state.

}

/**
* Checks whether all the partitions of the table are loaded into the state,
* set the flag {@code allPartitionsLoaded} to true if it is.
*/
private void checkPartitionsLoaded() {
for (String partition : this.initialPartitionsToLoad) {
try {
if (!this.partitionLoadState.contains(partition)) {
return;
}
} catch (Exception e) {
LOG.warn("Error when check whether all partitions are loaded, ignored", e);
throw new HoodieException(e);
}
}
this.allPartitionsLoaded = true;
}

@VisibleForTesting
public boolean isAllPartitionsLoaded() {
return this.allPartitionsLoaded;
}

@VisibleForTesting
public void clearIndexState() {
this.allPartitionsLoaded = false;
this.indexState.clear();
}

@VisibleForTesting
public boolean isKeyInState(HoodieKey hoodieKey) {
try {
return this.indexState.contains(hoodieKey);
} catch (Exception e) {
throw new HoodieException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ public void refreshTable() {
this.table = HoodieFlinkTable.create(this.config, this.context);
}

public HoodieTable<?, ?, ?, ?> getTable() {
return table;
}

/**
* Returns a list of small files in the given partition path.
*/
Expand Down
Loading