Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig config, HoodieTable hoodieTable) {
final Option<String> instantTime = hoodieTable
.getMetaClient()
.getCommitsTimeline()
.getActiveTimeline() // we need to include all actions and completed
.filterCompletedInstants()
.lastInstant()
.map(HoodieInstant::getTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,10 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
HoodieLogBlock logBlock = logFormatReaderWrapper.next();
final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
totalLogBlocks.incrementAndGet();
if (logBlock.getBlockType() != CORRUPT_BLOCK
&& !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
if (logBlock.isDataOrDeleteBlock() && !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
)) {
// hit a block with instant time greater than should be processed, stop processing further
break;
// hit a data block with instant time greater than should be processed, ignore processing the block
continue;
}
if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) {
if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public byte[] getMagic() {

public abstract HoodieLogBlockType getBlockType();

public boolean isDataOrDeleteBlock() {
return getBlockType() != HoodieLogBlockType.COMMAND_BLOCK && getBlockType() != HoodieLogBlockType.CORRUPT_BLOCK;
}

public long getLogBlockLength() {
throw new HoodieException("No implementation was provided");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;

import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -42,6 +47,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
Expand Down Expand Up @@ -73,6 +79,19 @@ private static Stream<Arguments> getTableTypeAndIndexType() {
);
}

private static Stream<Arguments> getTableTypeAndIndexTypeUpdateOrDelete() {
return Stream.of(
Arguments.of(COPY_ON_WRITE, GLOBAL_SIMPLE, true),
Arguments.of(COPY_ON_WRITE, RECORD_INDEX, true),
Arguments.of(COPY_ON_WRITE, GLOBAL_SIMPLE, false),
Arguments.of(COPY_ON_WRITE, RECORD_INDEX, false),
Arguments.of(MERGE_ON_READ, GLOBAL_SIMPLE, true),
Arguments.of(MERGE_ON_READ, RECORD_INDEX, true),
Arguments.of(MERGE_ON_READ, GLOBAL_SIMPLE, false),
Arguments.of(MERGE_ON_READ, RECORD_INDEX, false)
);
}

@ParameterizedTest
@MethodSource("getTableTypeAndIndexType")
public void testPartitionChanges(HoodieTableType tableType, IndexType indexType) throws IOException {
Expand Down Expand Up @@ -129,6 +148,92 @@ public void testPartitionChanges(HoodieTableType tableType, IndexType indexType)
}
}

/**
* Tests getTableTypeAndIndexTypeUpdateOrDelete
* @throws IOException
*/
@ParameterizedTest
@MethodSource("getTableTypeAndIndexTypeUpdateOrDelete")
public void testRollbacksWithPartitionUpdate(HoodieTableType tableType, IndexType indexType, boolean isUpsert) throws IOException {
final Class<?> payloadClass = DefaultHoodieRecordPayload.class;
HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType);
HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType, writeConfig.getProps());
List<HoodieRecord> updatesAtEpoch5 = null;
try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
final int totalRecords = 8;
final String p1 = "p1";
final String p2 = "p2";

// 1st batch: inserts
String commitTimeAtEpoch0 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> insertsAtEpoch0 = getInserts(totalRecords, p1, 0, payloadClass);
client.startCommitWithTime(commitTimeAtEpoch0);
assertNoWriteErrors(client.upsert(jsc().parallelize(insertsAtEpoch0, 2), commitTimeAtEpoch0).collect());

// 2nd batch: update 4 records from p1 to p2
String commitTimeAtEpoch5 = HoodieActiveTimeline.createNewInstantTime();
updatesAtEpoch5 = getUpdates(insertsAtEpoch0.subList(0, 4), p2, 5, payloadClass);
client.startCommitWithTime(commitTimeAtEpoch5);
if (isUpsert) {
assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch5).collect());
readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0);
readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 5);
} else {
assertNoWriteErrors(client.delete(jsc().parallelize(updatesAtEpoch5.stream().map(hoodieRecord -> hoodieRecord.getKey()).collect(Collectors.toList()), 2), commitTimeAtEpoch5).collect());
readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0);
readTableAndValidate(metaClient, new int[] {}, p2, 0);
}
// simuate crash. delete latest completed dc.
String latestCompletedDeltaCommit = metaClient.reloadActiveTimeline().getCommitsAndCompactionTimeline().lastInstant().get().getFileName();
metaClient.getFs().delete(new Path(metaClient.getBasePathV2() + "/.hoodie/" + latestCompletedDeltaCommit));

}

try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
final String p1 = "p1";
final String p2 = "p2";
final String p3 = "p3";

// re-ingest same batch
String commitTimeAtEpoch10 = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(commitTimeAtEpoch10);
if (isUpsert) {
assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch10).collect());
// this also tests snapshot query. We had a bug where MOR snapshot was ignoring rollbacks while determining last instant while reading log records.
readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0);
readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 5);
} else {
assertNoWriteErrors(client.delete(jsc().parallelize(updatesAtEpoch5.stream().map(hoodieRecord -> hoodieRecord.getKey()).collect(Collectors.toList()), 2), commitTimeAtEpoch10).collect());
readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0);
readTableAndValidate(metaClient, new int[] {}, p2, 0);
}

// upsert test
// update 4 of them from p2 to p3.
// delete test:
// update 4 of them to p3. these are treated as new inserts since they are deleted. no changes should be seen wrt p2.
String commitTimeAtEpoch15 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> updatesAtEpoch15 = getUpdates(updatesAtEpoch5, p3, 15, payloadClass);
client.startCommitWithTime(commitTimeAtEpoch15);
assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch15, 2), commitTimeAtEpoch15).collect());
// for the same bug pointed out earlier, (ignoring rollbacks while determining last instant while reading log records), this tests the HoodieMergedReadHandle.
readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0);
readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p3, 15);

// lets move 2 of them back to p1
String commitTimeAtEpoch20 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> updatesAtEpoch20 = getUpdates(updatesAtEpoch5.subList(0, 2), p1, 20, payloadClass);
client.startCommitWithTime(commitTimeAtEpoch20);
assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch20, 1), commitTimeAtEpoch20).collect());
// for the same bug pointed out earlier, (ignoring rollbacks while determining last instant while reading log records), this tests the HoodieMergedReadHandle.
Map<String, Long> expectedTsMap = new HashMap<>();
Arrays.stream(new int[] {0, 1}).forEach(entry -> expectedTsMap.put(String.valueOf(entry), 20L));
Arrays.stream(new int[] {4, 5, 6, 7}).forEach(entry -> expectedTsMap.put(String.valueOf(entry), 0L));
readTableAndValidate(metaClient, new int[] {0, 1, 4, 5, 6, 7}, p1, expectedTsMap);
readTableAndValidate(metaClient, new int[] {2, 3}, p3, 15);
}
}

@ParameterizedTest
@MethodSource("getTableTypeAndIndexType")
public void testUpdatePartitionsThenDelete(HoodieTableType tableType, IndexType indexType) throws IOException {
Expand Down Expand Up @@ -246,9 +351,8 @@ private void readTableAndValidate(HoodieTableMetaClient metaClient, int[] expect
.select("_hoodie_record_key", "_hoodie_partition_path", "id", "pt", "ts")
.cache();
int expectedCount = expectedIds.length;
assertEquals(expectedCount, df.count());
assertEquals(expectedCount, df.filter(String.format("pt = '%s'", expectedPartition)).count());
Row[] allRows = (Row[]) df.collect();
Row[] allRows = (Row[]) df.filter(String.format("pt = '%s'", expectedPartition)).collect();
for (int i = 0; i < expectedCount; i++) {
int expectedId = expectedIds[i];
Row r = allRows[i];
Expand Down Expand Up @@ -283,6 +387,8 @@ private HoodieWriteConfig getWriteConfig(Class<?> payloadClass, IndexType indexT
.withGlobalBloomIndexUpdatePartitionPath(true)
.withGlobalSimpleIndexUpdatePartitionPath(true)
.withRecordIndexUpdatePartitionPath(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(4).build())
.withSchema(SCHEMA_STR)
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
.fromProperties(getPayloadProps(payloadClass)).build())
Expand Down