diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index f46fadb0a7aab..918d4be7fcd36 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -241,7 +241,7 @@ private static HoodieData> getExistingRecords( HoodieData> partitionLocations, HoodieWriteConfig config, HoodieTable hoodieTable) { final Option instantTime = hoodieTable .getMetaClient() - .getCommitsTimeline() + .getActiveTimeline() // we need to include all actions and completed .filterCompletedInstants() .lastInstant() .map(HoodieInstant::getTimestamp); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 4c38d11467a02..01353175912b0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -252,13 +252,11 @@ private void scanInternalV1(Option keySpecOpt) { HoodieLogBlock logBlock = logFormatReaderWrapper.next(); final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME); totalLogBlocks.incrementAndGet(); - if (logBlock.getBlockType() != CORRUPT_BLOCK - && !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime - )) { - // hit a block with instant time greater than should be processed, stop processing further - break; - } - if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) { + if (logBlock.isDataOrDeleteBlock()) { + if (HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) { + // Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader + continue; + } if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) { // filter the log block by instant range continue; @@ -440,10 +438,10 @@ private void scanInternalV2(Option keySpecOption, boolean skipProcessin totalCorruptBlocks.incrementAndGet(); continue; } - if (!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), - HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) { - // hit a block with instant time greater than should be processed, stop processing further - break; + if (logBlock.isDataOrDeleteBlock() + && HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) { + // Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader + continue; } if (logBlock.getBlockType() != COMMAND_BLOCK) { if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java index 2f38dc9b25814..2962e7ddbd5e5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java @@ -261,13 +261,11 @@ private void scanInternalV1(Option keySpecOpt) { blockSeqNo = Integer.parseInt(parts[1]); } totalLogBlocks.incrementAndGet(); - if (logBlock.getBlockType() != CORRUPT_BLOCK - && !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime - )) { - // hit a block with instant time greater than should be processed, stop processing further - continue; - } - if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) { + if (logBlock.isDataOrDeleteBlock()) { + if (HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) { + // Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader + continue; + } if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) || inflightInstantsTimeline.containsInstant(instantTime)) { // hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one @@ -594,10 +592,10 @@ private void scanInternalV2(Option keySpecOption, boolean skipProcessin totalCorruptBlocks.incrementAndGet(); continue; } - if (!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), - HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) { - // hit a block with instant time greater than should be processed, stop processing further - break; + if (logBlock.isDataOrDeleteBlock() + && HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) { + // Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader + continue; } if (logBlock.getBlockType() != COMMAND_BLOCK) { if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index f806d635fa9af..73e199225a8c1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -98,6 +98,10 @@ public byte[] getMagic() { public abstract HoodieLogBlockType getBlockType(); + public boolean isDataOrDeleteBlock() { + return getBlockType().isDataOrDeleteBlock(); + } + public long getLogBlockLength() { throw new HoodieException("No implementation was provided"); } @@ -177,6 +181,13 @@ public enum HoodieLogBlockType { public static HoodieLogBlockType fromId(String id) { return ID_TO_ENUM_MAP.get(id); } + + /** + * @returns true if the log block type refers to data or delete block. false otherwise. + */ + public boolean isDataOrDeleteBlock() { + return this != HoodieLogBlockType.COMMAND_BLOCK && this != HoodieLogBlockType.CORRUPT_BLOCK; + } } /** diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 0e27b3e3f3293..12068b25708b7 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -181,6 +181,20 @@ public void tearDown() throws IOException { storage.deleteDirectory(new StoragePath(spillableBasePath)); } + @Test + public void testHoodieLogBlockTypeIsDataOrDeleteBlock() { + List dataOrDeleteBlocks = new ArrayList<>(); + dataOrDeleteBlocks.add(HoodieLogBlockType.DELETE_BLOCK); + dataOrDeleteBlocks.add(HoodieLogBlockType.AVRO_DATA_BLOCK); + dataOrDeleteBlocks.add(HoodieLogBlockType.PARQUET_DATA_BLOCK); + dataOrDeleteBlocks.add(HoodieLogBlockType.HFILE_DATA_BLOCK); + dataOrDeleteBlocks.add(HoodieLogBlockType.CDC_DATA_BLOCK); + + Arrays.stream(HoodieLogBlockType.values()).forEach(logBlockType -> { + assertEquals(dataOrDeleteBlocks.contains(logBlockType), logBlockType.isDataOrDeleteBlock()); + }); + } + @Test public void testEmptyLog() throws IOException { Writer writer = diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java index 86619019c284d..107a55f9f4f71 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java @@ -25,10 +25,15 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.TimeGenerator; +import org.apache.hudi.common.table.timeline.TimeGenerators; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.spark.SparkConf; @@ -43,6 +48,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; @@ -79,6 +85,13 @@ private static Stream getTableTypeAndIndexType() { ); } + private static Stream getTableTypeAndIndexTypeUpdateOrDelete() { + return Stream.of( + Arguments.of(MERGE_ON_READ, RECORD_INDEX, true), + Arguments.of(MERGE_ON_READ, RECORD_INDEX, false) + ); + } + @ParameterizedTest @MethodSource("getTableTypeAndIndexType") public void testPartitionChanges(HoodieTableType tableType, IndexType indexType) throws IOException { @@ -135,6 +148,88 @@ public void testPartitionChanges(HoodieTableType tableType, IndexType indexType) } } + /** + * Tests getTableTypeAndIndexTypeUpdateOrDelete + * @throws IOException + */ + @ParameterizedTest + @MethodSource("getTableTypeAndIndexTypeUpdateOrDelete") + public void testRollbacksWithPartitionUpdate(HoodieTableType tableType, IndexType indexType, boolean isUpsert) throws IOException { + final Class payloadClass = DefaultHoodieRecordPayload.class; + HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType); + TimeGenerator timeGenerator = TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig(), storageConf()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType, writeConfig.getProps()); + final int totalRecords = 8; + final String p1 = "p1"; + final String p2 = "p2"; + final String p3 = "p3"; + List insertsAtEpoch0 = getInserts(totalRecords, p1, 0, payloadClass); + List updatesAtEpoch5 = getUpdates(insertsAtEpoch0.subList(0, 4), p2, 5, payloadClass); + + try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) { + // 1st batch: inserts + String commitTimeAtEpoch0 = HoodieActiveTimeline.createNewInstantTime(false, timeGenerator); + client.startCommitWithTime(commitTimeAtEpoch0); + assertNoWriteErrors(client.upsert(jsc().parallelize(insertsAtEpoch0, 2), commitTimeAtEpoch0).collect()); + + // 2nd batch: update 4 records from p1 to p2 + String commitTimeAtEpoch5 = HoodieActiveTimeline.createNewInstantTime(false, timeGenerator); + client.startCommitWithTime(commitTimeAtEpoch5); + if (isUpsert) { + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch5).collect()); + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 5); + } else { + assertNoWriteErrors(client.delete(jsc().parallelize(updatesAtEpoch5.stream().map(hoodieRecord -> hoodieRecord.getKey()).collect(Collectors.toList()), 2), commitTimeAtEpoch5).collect()); + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {}, p2, 0); + } + // simuate crash. delete latest completed dc. + String latestCompletedDeltaCommit = metaClient.reloadActiveTimeline().getCommitsAndCompactionTimeline().lastInstant().get().getFileName(); + metaClient.getStorage().deleteFile(new StoragePath(metaClient.getBasePath() + "/.hoodie/" + latestCompletedDeltaCommit)); + } + + try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) { + // re-ingest same batch + String commitTimeAtEpoch10 = HoodieActiveTimeline.createNewInstantTime(false, timeGenerator); + client.startCommitWithTime(commitTimeAtEpoch10); + if (isUpsert) { + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch10).collect()); + // this also tests snapshot query. We had a bug where MOR snapshot was ignoring rollbacks while determining last instant while reading log records. + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 5); + } else { + assertNoWriteErrors(client.delete(jsc().parallelize(updatesAtEpoch5.stream().map(hoodieRecord -> hoodieRecord.getKey()).collect(Collectors.toList()), 2), commitTimeAtEpoch10).collect()); + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {}, p2, 0); + } + + // upsert test + // update 4 of them from p2 to p3. + // delete test: + // update 4 of them to p3. these are treated as new inserts since they are deleted. no changes should be seen wrt p2. + String commitTimeAtEpoch15 = HoodieActiveTimeline.createNewInstantTime(false, timeGenerator); + List updatesAtEpoch15 = getUpdates(updatesAtEpoch5, p3, 15, payloadClass); + client.startCommitWithTime(commitTimeAtEpoch15); + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch15, 2), commitTimeAtEpoch15).collect()); + // for the same bug pointed out earlier, (ignoring rollbacks while determining last instant while reading log records), this tests the HoodieMergedReadHandle. + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p3, 15); + + // lets move 2 of them back to p1 + String commitTimeAtEpoch20 = HoodieActiveTimeline.createNewInstantTime(false, timeGenerator); + List updatesAtEpoch20 = getUpdates(updatesAtEpoch5.subList(0, 2), p1, 20, payloadClass); + client.startCommitWithTime(commitTimeAtEpoch20); + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch20, 1), commitTimeAtEpoch20).collect()); + // for the same bug pointed out earlier, (ignoring rollbacks while determining last instant while reading log records), this tests the HoodieMergedReadHandle. + Map expectedTsMap = new HashMap<>(); + Arrays.stream(new int[] {0, 1}).forEach(entry -> expectedTsMap.put(String.valueOf(entry), 20L)); + Arrays.stream(new int[] {4, 5, 6, 7}).forEach(entry -> expectedTsMap.put(String.valueOf(entry), 0L)); + readTableAndValidate(metaClient, new int[] {0, 1, 4, 5, 6, 7}, p1, expectedTsMap); + readTableAndValidate(metaClient, new int[] {2, 3}, p3, 15); + } + } + @ParameterizedTest @MethodSource("getTableTypeAndIndexType") public void testUpdatePartitionsThenDelete(HoodieTableType tableType, IndexType indexType) throws IOException { @@ -252,9 +347,8 @@ private void readTableAndValidate(HoodieTableMetaClient metaClient, int[] expect .select("_hoodie_record_key", "_hoodie_partition_path", "id", "pt", "ts") .cache(); int expectedCount = expectedIds.length; - assertEquals(expectedCount, df.count()); - assertEquals(expectedCount, df.filter(String.format("pt = '%s'", expectedPartition)).count()); - Row[] allRows = (Row[]) df.collect(); + Row[] allRows = (Row[]) df.filter(String.format("pt = '%s'", expectedPartition)).collect(); + assertEquals(expectedCount, allRows.length); for (int i = 0; i < expectedCount; i++) { int expectedId = expectedIds[i]; Row r = allRows[i]; @@ -289,6 +383,8 @@ private HoodieWriteConfig getWriteConfig(Class payloadClass, IndexType indexT .withGlobalBloomIndexUpdatePartitionPath(true) .withGlobalSimpleIndexUpdatePartitionPath(true) .withRecordIndexUpdatePartitionPath(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(4).build()) .withSchema(SCHEMA_STR) .withPayloadConfig(HoodiePayloadConfig.newBuilder() .fromProperties(getPayloadProps(payloadClass)).build())