Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.table.functional;

import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;

import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.util.Arrays;

import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
import static org.apache.hudi.testutils.HoodieClientTestUtils.countRecordsOptionallySince;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Tag("functional")
public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClientFunctionalTestHarness {

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOException {
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
HoodieWriteConfig writeConfig = getConfigBuilder(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).retainCommits(1).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {

// 1st write batch; 3 commits for 3 partitions
String instantTime1 = HoodieActiveTimeline.createNewInstantTime(1000);
client.startCommitWithTime(instantTime1);
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime1, 10, DEFAULT_FIRST_PARTITION_PATH), 1), instantTime1);
String instantTime2 = HoodieActiveTimeline.createNewInstantTime(2000);
client.startCommitWithTime(instantTime2);
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime2, 10, DEFAULT_SECOND_PARTITION_PATH), 1), instantTime2);
String instantTime3 = HoodieActiveTimeline.createNewInstantTime(3000);
client.startCommitWithTime(instantTime3);
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime3, 1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime3);

final HoodieTimeline timeline1 = metaClient.getCommitsTimeline().filterCompletedInstants();
assertEquals(21, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline1, Option.empty()));

// delete the 1st and the 2nd partition; 1 replace commit
final String instantTime4 = HoodieActiveTimeline.createNewInstantTime(4000);
client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4);

// 2nd write batch; 3 commits for the 3rd partition; the 3rd commit to trigger archiving the replace commit
for (int i = 5; i < 8; i++) {
String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000);
client.startCommitWithTime(instantTime);
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime, 1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime);
}

// verify archived timeline
metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieTimeline archivedTimeline = metaClient.getArchivedTimeline();
assertTrue(archivedTimeline.containsInstant(instantTime1));
assertTrue(archivedTimeline.containsInstant(instantTime2));
assertTrue(archivedTimeline.containsInstant(instantTime3));
assertTrue(archivedTimeline.containsInstant(instantTime4), "should contain the replace commit.");

// verify records
final HoodieTimeline timeline2 = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals(4, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline2, Option.empty()),
"should only have the 4 records from the 3rd partition.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void testClustering(boolean doUpdates, boolean populateMetaFields, boolean prese
client.startCommitWithTime(newCommitTime);

List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");

/*
Expand All @@ -122,7 +122,7 @@ void testClustering(boolean doUpdates, boolean populateMetaFields, boolean prese
// we already set small file size to small number to force inserts to go into new file.
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
dataFiles = insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
dataFiles = insertRecordsToMORTable(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");

if (doUpdates) {
Expand All @@ -132,7 +132,7 @@ void testClustering(boolean doUpdates, boolean populateMetaFields, boolean prese
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
updateRecords(metaClient, records, client, cfg, newCommitTime);
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime);
}

HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
Expand Down Expand Up @@ -190,18 +190,18 @@ void testClusteringWithNoBaseFiles(boolean doUpdates) throws Exception {
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
assertTrue(!dataFiles.findAny().isPresent(), "should not have any base files");
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
dataFiles = insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
dataFiles = insertRecordsToMORTable(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
assertTrue(!dataFiles.findAny().isPresent(), "should not have any base files");
// run updates
if (doUpdates) {
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
updateRecords(metaClient, records, client, cfg, newCommitTime);
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime);
}

HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testIncrementalReadsWithCompaction() throws Exception {
client.startCommitWithTime(commitTime1);

List<HoodieRecord> records001 = dataGen.generateInserts(commitTime1, 200);
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records001, client, cfg, commitTime1);
Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, records001, client, cfg, commitTime1);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");

// verify only one base file shows up with commit time 001
Expand All @@ -118,7 +118,7 @@ public void testIncrementalReadsWithCompaction() throws Exception {
String updateTime = "004";
client.startCommitWithTime(updateTime);
List<HoodieRecord> records004 = dataGen.generateUpdates(updateTime, 100);
updateRecords(metaClient, records004, client, cfg, updateTime);
updateRecordsInMORTable(metaClient, records004, client, cfg, updateTime);

// verify RO incremental reads - only one base file shows up because updates to into log files
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
Expand All @@ -145,7 +145,7 @@ public void testIncrementalReadsWithCompaction() throws Exception {
String insertsTime = "006";
List<HoodieRecord> records006 = dataGen.generateInserts(insertsTime, 200);
client.startCommitWithTime(insertsTime);
dataFiles = insertRecords(metaClient, records006, client, cfg, insertsTime);
dataFiles = insertRecordsToMORTable(metaClient, records006, client, cfg, insertsTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");

// verify new write shows up in snapshot mode even though there is pending compaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void testSimpleInsertAndUpdate(HoodieFileFormat fileFormat, boolean popul
client.startCommitWithTime(newCommitTime);

List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records, client, cfg, newCommitTime);
Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, records, client, cfg, newCommitTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");

/*
Expand All @@ -104,7 +104,7 @@ public void testSimpleInsertAndUpdate(HoodieFileFormat fileFormat, boolean popul
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
updateRecords(metaClient, records, client, cfg, newCommitTime);
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime);

String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString();
client.compact(compactionCommitTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ protected JavaRDD<WriteStatus> updateLocation(
index.updateLocation(HoodieJavaRDD.of(writeStatus), context, table));
}

protected Stream<HoodieBaseFile> insertRecords(HoodieTableMetaClient metaClient, List<HoodieRecord> records,
protected Stream<HoodieBaseFile> insertRecordsToMORTable(HoodieTableMetaClient metaClient, List<HoodieRecord> records,
SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient);

Expand Down Expand Up @@ -242,7 +242,7 @@ protected Stream<HoodieBaseFile> insertRecords(HoodieTableMetaClient metaClient,
return dataFilesToRead;
}

protected void updateRecords(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
protected void updateRecordsInMORTable(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient);

Map<HoodieKey, HoodieRecord> recordsMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
* <p>
* Test data uses a toy Uber trips, data model.
*/
public class HoodieTestDataGenerator {
public class HoodieTestDataGenerator implements AutoCloseable {

// based on examination of sample file, the schema produces the following per record size
public static final int BYTES_PER_RECORD = (int) (1.2 * 1024);
Expand Down Expand Up @@ -860,6 +860,7 @@ public static class KeyPartition implements Serializable {
public String partitionPath;
}

@Override
public void close() {
existingKeysBySchema.clear();
}
Expand Down