diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 5751dbbf0b5c3..56ebe6a8ed1ce 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -240,7 +240,7 @@ private static HoodieData> getExistingRecords( HoodieData> partitionLocations, HoodieWriteConfig config, HoodieTable hoodieTable) { final Option instantTime = hoodieTable .getMetaClient() - .getCommitsTimeline() + .getActiveTimeline() // we need to include all actions and completed .filterCompletedInstants() .lastInstant() .map(HoodieInstant::getTimestamp); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index b5299ffa9bf70..549d0b01f3589 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -254,11 +254,10 @@ private void scanInternalV1(Option keySpecOpt) { HoodieLogBlock logBlock = logFormatReaderWrapper.next(); final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME); totalLogBlocks.incrementAndGet(); - if (logBlock.getBlockType() != CORRUPT_BLOCK - && !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime + if (logBlock.isDataOrDeleteBlock() && !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime )) { - // hit a block with instant time greater than should be processed, stop processing further - break; + // hit a data block with instant time greater than should be processed, ignore processing the block + continue; } if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) { if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index f98097e39d5c5..4437b578bce1e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -95,6 +95,10 @@ public byte[] getMagic() { public abstract HoodieLogBlockType getBlockType(); + public boolean isDataOrDeleteBlock() { + return getBlockType() != HoodieLogBlockType.COMMAND_BLOCK && getBlockType() != HoodieLogBlockType.CORRUPT_BLOCK; + } + public long getLogBlockLength() { throw new HoodieException("No implementation was provided"); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java index e53cfce8ece78..b02498e5c7fbc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java @@ -25,14 +25,19 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.hadoop.fs.Path; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -42,6 +47,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; @@ -73,6 +79,19 @@ private static Stream getTableTypeAndIndexType() { ); } + private static Stream getTableTypeAndIndexTypeUpdateOrDelete() { + return Stream.of( + Arguments.of(COPY_ON_WRITE, GLOBAL_SIMPLE, true), + Arguments.of(COPY_ON_WRITE, RECORD_INDEX, true), + Arguments.of(COPY_ON_WRITE, GLOBAL_SIMPLE, false), + Arguments.of(COPY_ON_WRITE, RECORD_INDEX, false), + Arguments.of(MERGE_ON_READ, GLOBAL_SIMPLE, true), + Arguments.of(MERGE_ON_READ, RECORD_INDEX, true), + Arguments.of(MERGE_ON_READ, GLOBAL_SIMPLE, false), + Arguments.of(MERGE_ON_READ, RECORD_INDEX, false) + ); + } + @ParameterizedTest @MethodSource("getTableTypeAndIndexType") public void testPartitionChanges(HoodieTableType tableType, IndexType indexType) throws IOException { @@ -129,6 +148,92 @@ public void testPartitionChanges(HoodieTableType tableType, IndexType indexType) } } + /** + * Tests getTableTypeAndIndexTypeUpdateOrDelete + * @throws IOException + */ + @ParameterizedTest + @MethodSource("getTableTypeAndIndexTypeUpdateOrDelete") + public void testRollbacksWithPartitionUpdate(HoodieTableType tableType, IndexType indexType, boolean isUpsert) throws IOException { + final Class payloadClass = DefaultHoodieRecordPayload.class; + HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType); + HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType, writeConfig.getProps()); + List updatesAtEpoch5 = null; + try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) { + final int totalRecords = 8; + final String p1 = "p1"; + final String p2 = "p2"; + + // 1st batch: inserts + String commitTimeAtEpoch0 = HoodieActiveTimeline.createNewInstantTime(); + List insertsAtEpoch0 = getInserts(totalRecords, p1, 0, payloadClass); + client.startCommitWithTime(commitTimeAtEpoch0); + assertNoWriteErrors(client.upsert(jsc().parallelize(insertsAtEpoch0, 2), commitTimeAtEpoch0).collect()); + + // 2nd batch: update 4 records from p1 to p2 + String commitTimeAtEpoch5 = HoodieActiveTimeline.createNewInstantTime(); + updatesAtEpoch5 = getUpdates(insertsAtEpoch0.subList(0, 4), p2, 5, payloadClass); + client.startCommitWithTime(commitTimeAtEpoch5); + if (isUpsert) { + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch5).collect()); + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 5); + } else { + assertNoWriteErrors(client.delete(jsc().parallelize(updatesAtEpoch5.stream().map(hoodieRecord -> hoodieRecord.getKey()).collect(Collectors.toList()), 2), commitTimeAtEpoch5).collect()); + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {}, p2, 0); + } + // simuate crash. delete latest completed dc. + String latestCompletedDeltaCommit = metaClient.reloadActiveTimeline().getCommitsAndCompactionTimeline().lastInstant().get().getFileName(); + metaClient.getFs().delete(new Path(metaClient.getBasePathV2() + "/.hoodie/" + latestCompletedDeltaCommit)); + + } + + try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) { + final String p1 = "p1"; + final String p2 = "p2"; + final String p3 = "p3"; + + // re-ingest same batch + String commitTimeAtEpoch10 = HoodieActiveTimeline.createNewInstantTime(); + client.startCommitWithTime(commitTimeAtEpoch10); + if (isUpsert) { + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch10).collect()); + // this also tests snapshot query. We had a bug where MOR snapshot was ignoring rollbacks while determining last instant while reading log records. + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 5); + } else { + assertNoWriteErrors(client.delete(jsc().parallelize(updatesAtEpoch5.stream().map(hoodieRecord -> hoodieRecord.getKey()).collect(Collectors.toList()), 2), commitTimeAtEpoch10).collect()); + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {}, p2, 0); + } + + // upsert test + // update 4 of them from p2 to p3. + // delete test: + // update 4 of them to p3. these are treated as new inserts since they are deleted. no changes should be seen wrt p2. + String commitTimeAtEpoch15 = HoodieActiveTimeline.createNewInstantTime(); + List updatesAtEpoch15 = getUpdates(updatesAtEpoch5, p3, 15, payloadClass); + client.startCommitWithTime(commitTimeAtEpoch15); + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch15, 2), commitTimeAtEpoch15).collect()); + // for the same bug pointed out earlier, (ignoring rollbacks while determining last instant while reading log records), this tests the HoodieMergedReadHandle. + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p3, 15); + + // lets move 2 of them back to p1 + String commitTimeAtEpoch20 = HoodieActiveTimeline.createNewInstantTime(); + List updatesAtEpoch20 = getUpdates(updatesAtEpoch5.subList(0, 2), p1, 20, payloadClass); + client.startCommitWithTime(commitTimeAtEpoch20); + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch20, 1), commitTimeAtEpoch20).collect()); + // for the same bug pointed out earlier, (ignoring rollbacks while determining last instant while reading log records), this tests the HoodieMergedReadHandle. + Map expectedTsMap = new HashMap<>(); + Arrays.stream(new int[] {0, 1}).forEach(entry -> expectedTsMap.put(String.valueOf(entry), 20L)); + Arrays.stream(new int[] {4, 5, 6, 7}).forEach(entry -> expectedTsMap.put(String.valueOf(entry), 0L)); + readTableAndValidate(metaClient, new int[] {0, 1, 4, 5, 6, 7}, p1, expectedTsMap); + readTableAndValidate(metaClient, new int[] {2, 3}, p3, 15); + } + } + @ParameterizedTest @MethodSource("getTableTypeAndIndexType") public void testUpdatePartitionsThenDelete(HoodieTableType tableType, IndexType indexType) throws IOException { @@ -246,9 +351,8 @@ private void readTableAndValidate(HoodieTableMetaClient metaClient, int[] expect .select("_hoodie_record_key", "_hoodie_partition_path", "id", "pt", "ts") .cache(); int expectedCount = expectedIds.length; - assertEquals(expectedCount, df.count()); assertEquals(expectedCount, df.filter(String.format("pt = '%s'", expectedPartition)).count()); - Row[] allRows = (Row[]) df.collect(); + Row[] allRows = (Row[]) df.filter(String.format("pt = '%s'", expectedPartition)).collect(); for (int i = 0; i < expectedCount; i++) { int expectedId = expectedIds[i]; Row r = allRows[i]; @@ -283,6 +387,8 @@ private HoodieWriteConfig getWriteConfig(Class payloadClass, IndexType indexT .withGlobalBloomIndexUpdatePartitionPath(true) .withGlobalSimpleIndexUpdatePartitionPath(true) .withRecordIndexUpdatePartitionPath(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(4).build()) .withSchema(SCHEMA_STR) .withPayloadConfig(HoodiePayloadConfig.newBuilder() .fromProperties(getPayloadProps(payloadClass)).build())