diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 55be8fc6598af..49c81b790283c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -27,7 +27,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.table.HoodieTable; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static java.util.stream.Collectors.toList; @@ -37,6 +37,27 @@ */ public class HoodieIndexUtils { + /** + * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions. + * + * @param partition Partition of interest + * @param context Instance of {@link HoodieEngineContext} to use + * @param hoodieTable Instance of {@link HoodieTable} of interest + * @return the list of {@link HoodieBaseFile} + */ + public static List getLatestBaseFilesForPartition( + final String partition, + final HoodieTable hoodieTable) { + Option latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline() + .filterCompletedInstants().lastInstant(); + if (latestCommitTime.isPresent()) { + return hoodieTable.getBaseFileOnlyView() + .getLatestBaseFilesBeforeOrOn(partition, latestCommitTime.get().getTimestamp()) + .collect(toList()); + } + return Collections.emptyList(); + } + /** * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions. * @@ -50,15 +71,11 @@ public static List> getLatestBaseFilesForAllPartiti final HoodieTable hoodieTable) { context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions"); return context.flatMap(partitions, partitionPath -> { - Option latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline() - .filterCompletedInstants().lastInstant(); - List> filteredFiles = new ArrayList<>(); - if (latestCommitTime.isPresent()) { - filteredFiles = hoodieTable.getBaseFileOnlyView() - .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp()) - .map(f -> Pair.of(partitionPath, f)) - .collect(toList()); - } + List> filteredFiles = + getLatestBaseFilesForPartition(partitionPath, hoodieTable).stream() + .map(baseFile -> Pair.of(partitionPath, baseFile)) + .collect(toList()); + return filteredFiles.stream(); }, Math.max(partitions.size(), 1)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 3b989db2af1a2..0f193c527d3e2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -46,7 +46,6 @@ import org.apache.avro.io.EncoderFactory; import org.apache.avro.io.JsonDecoder; import org.apache.avro.io.JsonEncoder; -import org.codehaus.jackson.node.NullNode; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -235,9 +234,9 @@ public static Schema getRecordKeyPartitionPathSchema() { Schema recordSchema = Schema.createRecord("HoodieRecordKey", "", "", false); Schema.Field recordKeyField = - new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); + new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE); Schema.Field partitionPathField = - new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); + new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE); toBeAddedFields.add(recordKeyField); toBeAddedFields.add(partitionPathField); diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java index 590ee3be84356..70289b7b512c8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java @@ -21,25 +21,41 @@ import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.util.StreamerUtil; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.stream.Collectors; /** * The function to build the write profile incrementally for records within a checkpoint, @@ -48,8 +64,8 @@ *

All the records are tagged with HoodieRecordLocation, instead of real instant time, * INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep * the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides - * where the record should write to. The "I" and "U" tag is only used for downstream to decide whether - * the data bucket is a INSERT or a UPSERT, we should factor the it out when the underneath writer + * where the record should write to. The "I" and "U" tags are only used for downstream to decide whether + * the data bucket is an INSERT or an UPSERT, we should factor the tags out when the underneath writer * supports specifying the bucket type explicitly. * *

The output records should then shuffle by the bucket ID and thus do scalable write. @@ -60,14 +76,51 @@ public class BucketAssignFunction> extends KeyedProcessFunction implements CheckpointedFunction, CheckpointListener { + private static final Logger LOG = LoggerFactory.getLogger(BucketAssignFunction.class); + + private HoodieFlinkEngineContext context; + + /** + * Index cache(speed-up) state for the underneath file based(BloomFilter) indices. + * When a record came in, we do these check: + * + *

    + *
  • Try to load all the records in the partition path where the record belongs to
  • + *
  • Checks whether the state contains the record key
  • + *
  • If it does, tag the record with the location
  • + *
  • If it does not, use the {@link BucketAssigner} to generate a new bucket ID
  • + *
+ */ private MapState indexState; + /** + * Bucket assigner to assign new bucket IDs or reuse existing ones. + */ private BucketAssigner bucketAssigner; private final Configuration conf; + private transient org.apache.hadoop.conf.Configuration hadoopConf; + private final boolean isChangingRecords; + /** + * All the partition paths when the task starts. It is used to help checking whether all the partitions + * are loaded into the state. + */ + private transient List initialPartitionsToLoad; + + /** + * State to book-keep which partition is loaded into the index state {@code indexState}. + */ + private MapState partitionLoadState; + + /** + * Whether all partitions are loaded, if it is true, + * we can only check the state for locations. + */ + private boolean allPartitionsLoaded = false; + public BucketAssignFunction(Configuration conf) { this.conf = conf; this.isChangingRecords = WriteOperationType.isChangingRecords( @@ -78,13 +131,20 @@ public BucketAssignFunction(Configuration conf) { public void open(Configuration parameters) throws Exception { super.open(parameters); HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); - HoodieFlinkEngineContext context = - new HoodieFlinkEngineContext( - new SerializableConfiguration(StreamerUtil.getHadoopConf()), - new FlinkTaskContextSupplier(getRuntimeContext())); - this.bucketAssigner = new BucketAssigner( - context, - writeConfig); + this.hadoopConf = StreamerUtil.getHadoopConf(); + this.context = new HoodieFlinkEngineContext( + new SerializableConfiguration(this.hadoopConf), + new FlinkTaskContextSupplier(getRuntimeContext())); + this.bucketAssigner = new BucketAssigner(context, writeConfig); + List allPartitionPaths = FSUtils.getAllPartitionPaths(this.context, + this.conf.getString(FlinkOptions.PATH), false, false, false); + final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); + final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); + final int taskID = getRuntimeContext().getIndexOfThisSubtask(); + // reference: org.apache.flink.streaming.api.datastream.KeyedStream + this.initialPartitionsToLoad = allPartitionPaths.stream() + .filter(partition -> KeyGroupRangeAssignment.assignKeyToParallelOperator(partition, maxParallelism, parallelism) == taskID) + .collect(Collectors.toList()); } @Override @@ -100,6 +160,12 @@ public void initializeState(FunctionInitializationContext context) { TypeInformation.of(HoodieKey.class), TypeInformation.of(HoodieRecordLocation.class)); indexState = context.getKeyedStateStore().getMapState(indexStateDesc); + MapStateDescriptor partitionLoadStateDesc = + new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT); + partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc); + if (context.isRestored()) { + checkPartitionsLoaded(); + } } @SuppressWarnings("unchecked") @@ -112,6 +178,10 @@ public void processElement(I value, Context ctx, Collector out) throws Except final HoodieKey hoodieKey = record.getKey(); final BucketInfo bucketInfo; final HoodieRecordLocation location; + if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) { + // If the partition records are never loaded, load the records first. + loadRecords(hoodieKey.getPartitionPath()); + } // Only changing records need looking up the index for the location, // append only records are always recognized as INSERT. if (isChangingRecords && this.indexState.contains(hoodieKey)) { @@ -146,5 +216,69 @@ public void notifyCheckpointComplete(long l) { // Refresh the table state when there are new commits. this.bucketAssigner.reset(); this.bucketAssigner.refreshTable(); + checkPartitionsLoaded(); + } + + /** + * Load all the indices of give partition path into the backup state. + * + * @param partitionPath The partition path + * @throws Exception when error occurs for state update + */ + private void loadRecords(String partitionPath) throws Exception { + HoodieTable hoodieTable = bucketAssigner.getTable(); + List latestBaseFiles = + HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable); + for (HoodieBaseFile baseFile : latestBaseFiles) { + List hoodieKeys = + ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath())); + hoodieKeys.forEach(hoodieKey -> { + try { + this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId())); + } catch (Exception e) { + throw new HoodieIOException("Error when load record keys from file: " + baseFile); + } + }); + } + // Mark the partition path as loaded. + partitionLoadState.put(partitionPath, 0); + } + + /** + * Checks whether all the partitions of the table are loaded into the state, + * set the flag {@code allPartitionsLoaded} to true if it is. + */ + private void checkPartitionsLoaded() { + for (String partition : this.initialPartitionsToLoad) { + try { + if (!this.partitionLoadState.contains(partition)) { + return; + } + } catch (Exception e) { + LOG.warn("Error when check whether all partitions are loaded, ignored", e); + throw new HoodieException(e); + } + } + this.allPartitionsLoaded = true; + } + + @VisibleForTesting + public boolean isAllPartitionsLoaded() { + return this.allPartitionsLoaded; + } + + @VisibleForTesting + public void clearIndexState() { + this.allPartitionsLoaded = false; + this.indexState.clear(); + } + + @VisibleForTesting + public boolean isKeyInState(HoodieKey hoodieKey) { + try { + return this.indexState.contains(hoodieKey); + } catch (Exception e) { + throw new HoodieException(e); + } } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java index f87a802cfa5ae..793acdca6941b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java @@ -207,6 +207,10 @@ public void refreshTable() { this.table = HoodieFlinkTable.create(this.config, this.context); } + public HoodieTable getTable() { + return table; + } + /** * Returns a list of small files in the given partition path. */ diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java index 12a00dd8eaf1b..492a50585d137 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.exception.HoodieException; @@ -48,6 +49,7 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -382,6 +384,68 @@ public void testInsertWithMiniBatches() throws Exception { checkWrittenData(tempFile, expected, 1); } + @Test + public void testIndexStateBootstrap() throws Exception { + // open the function and ingest data + funcWrapper.openFunction(); + for (RowData rowData : TestData.DATA_SET_ONE) { + funcWrapper.invoke(rowData); + } + + assertEmptyDataFiles(); + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + + OperatorEvent nextEvent = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + + funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + funcWrapper.checkpointComplete(1); + + // Mark the index state as not fully loaded to trigger re-load from the filesystem. + funcWrapper.clearIndexState(); + + // upsert another data buffer + for (RowData rowData : TestData.DATA_SET_TWO) { + funcWrapper.invoke(rowData); + } + checkIndexLoaded( + new HoodieKey("id1", "par1"), + new HoodieKey("id2", "par1"), + new HoodieKey("id3", "par2"), + new HoodieKey("id4", "par2"), + new HoodieKey("id5", "par3"), + new HoodieKey("id6", "par3"), + new HoodieKey("id7", "par4"), + new HoodieKey("id8", "par4")); + // the data is not flushed yet + checkWrittenData(tempFile, EXPECTED1); + // this triggers the data write and event send + funcWrapper.checkpointFunction(2); + + String instant = funcWrapper.getWriteClient() + .getInflightAndRequestedInstant("COPY_ON_WRITE"); + + nextEvent = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + checkWrittenData(tempFile, EXPECTED2); + + funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); + assertFalse(funcWrapper.isAllPartitionsLoaded(), + "All partitions assume to be loaded into the index state"); + funcWrapper.checkpointComplete(2); + // the coordinator checkpoint commits the inflight instant. + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + checkWrittenData(tempFile, EXPECTED2); + assertTrue(funcWrapper.isAllPartitionsLoaded(), + "All partitions assume to be loaded into the index state"); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- @@ -419,4 +483,11 @@ private void assertEmptyDataFiles() { assertNotNull(dataFiles); assertThat(dataFiles.length, is(0)); } + + private void checkIndexLoaded(HoodieKey... keys) { + for (HoodieKey key : keys) { + assertTrue(funcWrapper.isKeyInState(key), + "Key: " + key + " assumes to be in the index state"); + } + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockFunctionInitializationContext.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockFunctionInitializationContext.java index 40e58fe1fafa8..2aae03326222f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockFunctionInitializationContext.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockFunctionInitializationContext.java @@ -33,7 +33,7 @@ public MockFunctionInitializationContext() { @Override public boolean isRestored() { - throw new UnsupportedOperationException(); + return false; } @Override diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java index 6f7062c6fc019..d16d4d7358099 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java @@ -19,6 +19,7 @@ package org.apache.hudi.operator.utils; import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.operator.StreamWriteFunction; import org.apache.hudi.operator.StreamWriteOperatorCoordinator; @@ -162,4 +163,16 @@ public void close() throws Exception { public StreamWriteOperatorCoordinator getCoordinator() { return coordinator; } + + public void clearIndexState() { + this.bucketAssignerFunction.clearIndexState(); + } + + public boolean isKeyInState(HoodieKey hoodieKey) { + return this.bucketAssignerFunction.isKeyInState(hoodieKey); + } + + public boolean isAllPartitionsLoaded() { + return this.bucketAssignerFunction.isAllPartitionsLoaded(); + } }