diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 5751dbbf0b5c3..56ebe6a8ed1ce 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -240,7 +240,7 @@ private static HoodieData> getExistingRecords( HoodieData> partitionLocations, HoodieWriteConfig config, HoodieTable hoodieTable) { final Option instantTime = hoodieTable .getMetaClient() - .getCommitsTimeline() + .getActiveTimeline() // we need to include all actions and completed .filterCompletedInstants() .lastInstant() .map(HoodieInstant::getTimestamp); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index b5299ffa9bf70..5a8dbb7b0dc37 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -254,15 +254,12 @@ private void scanInternalV1(Option keySpecOpt) { HoodieLogBlock logBlock = logFormatReaderWrapper.next(); final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME); totalLogBlocks.incrementAndGet(); - if (logBlock.getBlockType() != CORRUPT_BLOCK - && !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime - )) { - // hit a block with instant time greater than should be processed, stop processing further - break; - } - if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) { - if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) - || inflightInstantsTimeline.containsInstant(instantTime)) { + if (logBlock.isDataOrDeleteBlock()) { + if (HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) { + // Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader + continue; + } + if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) || inflightInstantsTimeline.containsInstant(instantTime)) { // hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one continue; } @@ -450,12 +447,11 @@ private void scanInternalV2(Option keySpecOption, boolean skipProcessin totalCorruptBlocks.incrementAndGet(); continue; } - if (!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), - HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) { - // hit a block with instant time greater than should be processed, stop processing further - break; - } - if (logBlock.getBlockType() != COMMAND_BLOCK) { + if (logBlock.isDataOrDeleteBlock()) { + if (HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) { + // Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader + continue; + } if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) || inflightInstantsTimeline.containsInstant(instantTime)) { // hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index f98097e39d5c5..23a918cee1226 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -95,6 +95,10 @@ public byte[] getMagic() { public abstract HoodieLogBlockType getBlockType(); + public boolean isDataOrDeleteBlock() { + return getBlockType().isDataOrDeleteBlock(); + } + public long getLogBlockLength() { throw new HoodieException("No implementation was provided"); } @@ -160,6 +164,13 @@ public enum HoodieLogBlockType { public static HoodieLogBlockType fromId(String id) { return ID_TO_ENUM_MAP.get(id); } + + /** + * @returns true if the log block type refers to data or delete block. false otherwise. + */ + public boolean isDataOrDeleteBlock() { + return this != HoodieLogBlockType.COMMAND_BLOCK && this != HoodieLogBlockType.CORRUPT_BLOCK; + } } /** diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 7c282536423da..457b614f82a96 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -60,6 +60,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.CorruptedLogFileException; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; @@ -179,6 +180,20 @@ public void tearDown() throws IOException { storage.deleteDirectory(new StoragePath(spillableBasePath)); } + @Test + public void testHoodieLogBlockTypeIsDataOrDeleteBlock() { + List dataOrDeleteBlocks = new ArrayList<>(); + dataOrDeleteBlocks.add(HoodieLogBlockType.DELETE_BLOCK); + dataOrDeleteBlocks.add(HoodieLogBlockType.AVRO_DATA_BLOCK); + dataOrDeleteBlocks.add(HoodieLogBlockType.PARQUET_DATA_BLOCK); + dataOrDeleteBlocks.add(HoodieLogBlockType.HFILE_DATA_BLOCK); + dataOrDeleteBlocks.add(HoodieLogBlockType.CDC_DATA_BLOCK); + + Arrays.stream(HoodieLogBlockType.values()).forEach(logBlockType -> { + assertEquals(dataOrDeleteBlocks.contains(logBlockType), logBlockType.isDataOrDeleteBlock()); + }); + } + @Test public void testEmptyLog() throws IOException { Writer writer = @@ -647,23 +662,185 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { - // Generate 4 delta-log files w/ random records Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); SchemaTestUtil testUtil = new SchemaTestUtil(); + appendAndValidate(schema, testUtil, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, + "100"); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testLogRecordReaderWithMaxInstantTimeConfigured(boolean enableOptimizedLogScan) throws IOException, URISyntaxException, InterruptedException { + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + SchemaTestUtil testUtil = new SchemaTestUtil(); + + Pair, Set> firstBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan, + "100"); + + // trigger another batch of writes for next commit + Pair, Set> secondBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan, + "200", firstBatch.getKey(), firstBatch.getValue()); + + List firstAndSecondBatch = new ArrayList<>(firstBatch.getKey()); + firstAndSecondBatch.addAll(secondBatch.getKey()); + + // set max commit time as 200 and validate only first batch of records are returned + List allLogFiles = getSortedLogFilesList(Arrays.asList(firstBatch.getValue(), secondBatch.getValue())); + + // expect records only from first batch when max commit time is set to 100. + readAndValidate(schema, "100", allLogFiles, firstBatch.getKey()); + + // add another batch. + Pair, Set> thirdBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan, + "300", firstAndSecondBatch, new HashSet<>(allLogFiles)); + + allLogFiles = getSortedLogFilesList(Arrays.asList(firstBatch.getValue(), secondBatch.getValue(), thirdBatch.getValue())); + + // set max commit time as 100 and validate only first batch of records are returned + readAndValidate(schema, "100", allLogFiles, firstBatch.getKey()); + readAndValidate(schema, "200", allLogFiles, firstAndSecondBatch); + List allBatches = new ArrayList<>(firstAndSecondBatch); + allBatches.addAll(thirdBatch.getKey()); + readAndValidate(schema, "300", allLogFiles, allBatches); + + // add rollback to commit 200 + addRollbackBlock("400", "200"); + + // lets not remove commit 200 from timeline. but still due to presence of rollback block, 2nd batch should be ignored. + List firstAndThirdBatch = new ArrayList<>(firstBatch.getKey()); + firstAndThirdBatch.addAll(thirdBatch.getKey()); + readAndValidate(schema, "300", allLogFiles, firstAndThirdBatch); + + // if we set maxCommitTime as 200 (which is rolled back), expected records are just from batch1 + readAndValidate(schema, "200", allLogFiles, firstBatch.getKey()); + + // lets repeat the same after removing the commit from timeline. + FileCreateUtils.deleteDeltaCommit(basePath, "200", storage); + readAndValidate(schema, "300", allLogFiles, firstAndThirdBatch); + // if we set maxCommitTime as 200 (which is rolled back commit), expected records are just from batch1 + readAndValidate(schema, "200", allLogFiles, firstBatch.getKey()); + + // let's test rollback issue from HUDI-8248 + // lets add commit 400 (batch4). add a rollback block with commit time 500 which rollsback 400. again, add log files with commit time 400 (batch5) + // when we read all log files w/ max commit time as 400, batch4 needs to be ignored and only batch5 should be read. + // trigger another batch of writes for next commit + Pair, Set> fourthBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan, + "400", firstAndThirdBatch, new HashSet<>(allLogFiles)); + + // lets delete commit 400 from timeline to simulate crash. + FileCreateUtils.deleteDeltaCommit(basePath, "400", storage); + + // set max commit time as 400 and validate only first and 3rd batch is read. 1st batch is rolled back completely. 4th batch is partially failed commit. + allLogFiles = getSortedLogFilesList(Arrays.asList(firstBatch.getValue(), thirdBatch.getValue(), fourthBatch.getValue())); + readAndValidate(schema, "400", allLogFiles, firstAndThirdBatch); + + // lets add the rollback block + addRollbackBlock("500", "400"); + // lets redo the read test + readAndValidate(schema, "400", allLogFiles, firstAndThirdBatch); + + // and lets re-add new log files w/ commit time 400. + Pair, Set> fifthBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan, + "400", firstBatch.getKey(), firstBatch.getValue()); + + // lets redo the read test. this time, first batch, 3rd batch and fifth batch should be expected. + allLogFiles = getSortedLogFilesList(Arrays.asList(firstBatch.getValue(), thirdBatch.getValue(), fourthBatch.getValue(), fifthBatch.getValue())); + List firstThirdFifthBatch = new ArrayList<>(firstAndThirdBatch); + firstThirdFifthBatch.addAll(fifthBatch.getKey()); + readAndValidate(schema, "400", allLogFiles, firstThirdFifthBatch); + + // even setting very high value for max commit time should not matter. + readAndValidate(schema, "600", allLogFiles, firstThirdFifthBatch); + } + + private void addRollbackBlock(String rollbackCommitTime, String commitToRollback) throws IOException, InterruptedException { + Writer writer = + HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withSizeThreshold(1024).withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build(); + Map header = new HashMap<>(); + + // Rollback the 1st block i.e. a data block. + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, rollbackCommitTime); + header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commitToRollback); + header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); + HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); + writer.appendBlock(commandBlock); + writer.close(); + } + + private List getSortedLogFilesList(List> logFilesSets) { + Set allLogFiles = new HashSet<>(); + logFilesSets.forEach(logfileSet -> allLogFiles.addAll(logfileSet)); + List allLogFilesList = new ArrayList<>(allLogFiles); + Collections.sort(allLogFilesList, new HoodieLogFile.LogFileComparator()); + return allLogFilesList; + } + + private void readAndValidate(Schema schema, String maxCommitTime, List logFiles, List expectedRecords) throws IOException { + try (HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withStorage(storage) + .withBasePath(basePath) + .withLogFilePaths( + logFiles.stream() + .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())) + .withReaderSchema(schema) + .withLatestInstantTime(maxCommitTime) + .withMaxMemorySizeInBytes(10240L) + .withReverseReader(false) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) + .withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK) + .withBitCaskDiskMapCompressionEnabled(false) + .withOptimizedLogBlocksScan(false) + .build()) { + + List scannedRecords = new ArrayList<>(); + for (HoodieRecord record : scanner) { + scannedRecords.add((IndexedRecord) + ((HoodieAvroRecord) record).getData().getInsertValue(schema).get()); + } + + assertEquals(sort(expectedRecords), sort(scannedRecords), + "Scanner records count should be the same as appended records"); + } + } + + private Pair, Set> appendAndValidate(Schema schema, SchemaTestUtil testUtil, ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean enableOptimizedLogBlocksScan, + String commitTime) throws IOException, URISyntaxException, InterruptedException { + return appendAndValidate(schema, testUtil, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, commitTime, + Collections.emptyList(), Collections.emptySet()); + } + + private Pair, Set> appendAndValidate(Schema schema, SchemaTestUtil testUtil, ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean enableOptimizedLogBlocksScan, + String commitTime, + List prevGenRecords, Set prevLogFiles) throws IOException, + URISyntaxException, InterruptedException { + + // Generate 4 delta-log files w/ random records List genRecords = testUtil.generateHoodieTestRecords(0, 400); + Set logFiles = writeLogFiles(partitionPath, schema, genRecords, 4, commitTime); - Set logFiles = writeLogFiles(partitionPath, schema, genRecords, 4); + Set allLogFiles = new HashSet<>(); + allLogFiles.addAll(logFiles); + allLogFiles.addAll(prevLogFiles); + List allLogFilesList = new ArrayList<>(allLogFiles); + Collections.sort(allLogFilesList, new HoodieLogFile.LogFileComparator()); - FileCreateUtils.createDeltaCommit(basePath, "100", storage); + FileCreateUtils.createDeltaCommit(basePath, commitTime, storage); // scan all log blocks (across multiple log files) HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() .withStorage(storage) .withBasePath(basePath) .withLogFilePaths( - logFiles.stream() + allLogFilesList.stream() .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())) .withReaderSchema(schema) - .withLatestInstantTime("100") + .withLatestInstantTime(commitTime) .withMaxMemorySizeInBytes(10240L) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) @@ -679,9 +856,13 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType ((HoodieAvroRecord) record).getData().getInsertValue(schema).get()); } - assertEquals(sort(genRecords), sort(scannedRecords), + List allGenRecords = new ArrayList<>(genRecords); + allGenRecords.addAll(prevGenRecords); + + assertEquals(sort(allGenRecords), sort(scannedRecords), "Scanner records count should be the same as appended records"); scanner.close(); + return Pair.of(genRecords, logFiles); } @ParameterizedTest @@ -2727,21 +2908,18 @@ private static Set writeLogFiles(StoragePath partitionPath, List records, int numFiles) throws IOException, InterruptedException { - return writeLogFiles(partitionPath, schema, records, numFiles, false); + return writeLogFiles(partitionPath, schema, records, numFiles, "100"); } private static Set writeLogFiles(StoragePath partitionPath, Schema schema, List records, - int numFiles, - boolean enableBlockSequenceNumbers) - throws IOException, InterruptedException { - int blockSeqNo = 0; + int numFiles, String commitTime) throws IOException, InterruptedException { Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) .withSizeThreshold(1024).withFileId("test-fileid1").overBaseCommit("100").withStorage(storage).build(); Map header = new HashMap<>(); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); Set logFiles = new HashSet<>(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java index e53cfce8ece78..8b39d1f39bdea 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java @@ -25,10 +25,13 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.spark.sql.Dataset; @@ -42,6 +45,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; @@ -73,6 +77,13 @@ private static Stream getTableTypeAndIndexType() { ); } + private static Stream getTableTypeAndIndexTypeUpdateOrDelete() { + return Stream.of( + Arguments.of(MERGE_ON_READ, RECORD_INDEX, true), + Arguments.of(MERGE_ON_READ, RECORD_INDEX, false) + ); + } + @ParameterizedTest @MethodSource("getTableTypeAndIndexType") public void testPartitionChanges(HoodieTableType tableType, IndexType indexType) throws IOException { @@ -129,6 +140,87 @@ public void testPartitionChanges(HoodieTableType tableType, IndexType indexType) } } + /** + * Tests getTableTypeAndIndexTypeUpdateOrDelete + * @throws IOException + */ + @ParameterizedTest + @MethodSource("getTableTypeAndIndexTypeUpdateOrDelete") + public void testRollbacksWithPartitionUpdate(HoodieTableType tableType, IndexType indexType, boolean isUpsert) throws IOException { + final Class payloadClass = DefaultHoodieRecordPayload.class; + HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType); + HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType, writeConfig.getProps()); + final int totalRecords = 8; + final String p1 = "p1"; + final String p2 = "p2"; + final String p3 = "p3"; + List insertsAtEpoch0 = getInserts(totalRecords, p1, 0, payloadClass); + List updatesAtEpoch5 = getUpdates(insertsAtEpoch0.subList(0, 4), p2, 5, payloadClass); + + try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) { + // 1st batch: inserts + String commitTimeAtEpoch0 = HoodieActiveTimeline.createNewInstantTime(); + client.startCommitWithTime(commitTimeAtEpoch0); + assertNoWriteErrors(client.upsert(jsc().parallelize(insertsAtEpoch0, 2), commitTimeAtEpoch0).collect()); + + // 2nd batch: update 4 records from p1 to p2 + String commitTimeAtEpoch5 = HoodieActiveTimeline.createNewInstantTime(); + client.startCommitWithTime(commitTimeAtEpoch5); + if (isUpsert) { + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch5).collect()); + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 5); + } else { + assertNoWriteErrors(client.delete(jsc().parallelize(updatesAtEpoch5.stream().map(hoodieRecord -> hoodieRecord.getKey()).collect(Collectors.toList()), 2), commitTimeAtEpoch5).collect()); + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {}, p2, 0); + } + // simuate crash. delete latest completed dc. + String latestCompletedDeltaCommit = metaClient.reloadActiveTimeline().getCommitsAndCompactionTimeline().lastInstant().get().getFileName(); + metaClient.getStorage().deleteFile(new StoragePath(metaClient.getBasePath() + "/.hoodie/" + latestCompletedDeltaCommit)); + } + + try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) { + // re-ingest same batch + String commitTimeAtEpoch10 = HoodieActiveTimeline.createNewInstantTime(); + client.startCommitWithTime(commitTimeAtEpoch10); + if (isUpsert) { + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch10).collect()); + // this also tests snapshot query. We had a bug where MOR snapshot was ignoring rollbacks while determining last instant while reading log records. + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 5); + } else { + assertNoWriteErrors(client.delete(jsc().parallelize(updatesAtEpoch5.stream().map(hoodieRecord -> hoodieRecord.getKey()).collect(Collectors.toList()), 2), commitTimeAtEpoch10).collect()); + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {}, p2, 0); + } + + // upsert test + // update 4 of them from p2 to p3. + // delete test: + // update 4 of them to p3. these are treated as new inserts since they are deleted. no changes should be seen wrt p2. + String commitTimeAtEpoch15 = HoodieActiveTimeline.createNewInstantTime(); + List updatesAtEpoch15 = getUpdates(updatesAtEpoch5, p3, 15, payloadClass); + client.startCommitWithTime(commitTimeAtEpoch15); + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch15, 2), commitTimeAtEpoch15).collect()); + // for the same bug pointed out earlier, (ignoring rollbacks while determining last instant while reading log records), this tests the HoodieMergedReadHandle. + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p3, 15); + + // lets move 2 of them back to p1 + String commitTimeAtEpoch20 = HoodieActiveTimeline.createNewInstantTime(); + List updatesAtEpoch20 = getUpdates(updatesAtEpoch5.subList(0, 2), p1, 20, payloadClass); + client.startCommitWithTime(commitTimeAtEpoch20); + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch20, 1), commitTimeAtEpoch20).collect()); + // for the same bug pointed out earlier, (ignoring rollbacks while determining last instant while reading log records), this tests the HoodieMergedReadHandle. + Map expectedTsMap = new HashMap<>(); + Arrays.stream(new int[] {0, 1}).forEach(entry -> expectedTsMap.put(String.valueOf(entry), 20L)); + Arrays.stream(new int[] {4, 5, 6, 7}).forEach(entry -> expectedTsMap.put(String.valueOf(entry), 0L)); + readTableAndValidate(metaClient, new int[] {0, 1, 4, 5, 6, 7}, p1, expectedTsMap); + readTableAndValidate(metaClient, new int[] {2, 3}, p3, 15); + } + } + @ParameterizedTest @MethodSource("getTableTypeAndIndexType") public void testUpdatePartitionsThenDelete(HoodieTableType tableType, IndexType indexType) throws IOException { @@ -246,9 +338,8 @@ private void readTableAndValidate(HoodieTableMetaClient metaClient, int[] expect .select("_hoodie_record_key", "_hoodie_partition_path", "id", "pt", "ts") .cache(); int expectedCount = expectedIds.length; - assertEquals(expectedCount, df.count()); - assertEquals(expectedCount, df.filter(String.format("pt = '%s'", expectedPartition)).count()); - Row[] allRows = (Row[]) df.collect(); + Row[] allRows = (Row[]) df.filter(String.format("pt = '%s'", expectedPartition)).collect(); + assertEquals(expectedCount, allRows.length); for (int i = 0; i < expectedCount; i++) { int expectedId = expectedIds[i]; Row r = allRows[i]; @@ -283,6 +374,8 @@ private HoodieWriteConfig getWriteConfig(Class payloadClass, IndexType indexT .withGlobalBloomIndexUpdatePartitionPath(true) .withGlobalSimpleIndexUpdatePartitionPath(true) .withRecordIndexUpdatePartitionPath(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(4).build()) .withSchema(SCHEMA_STR) .withPayloadConfig(HoodiePayloadConfig.newBuilder() .fromProperties(getPayloadProps(payloadClass)).build())