Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ public String validateFiles(
}

private HoodieWriteConfig getWriteConfig() {
return HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath)
return HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath).withAutoCommit(true)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePa
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
HoodieWriteConfig updatedConfig = HoodieWriteConfig.newBuilder().withProps(config.getProps())
HoodieWriteConfig updatedConfig = HoodieWriteConfig.newBuilder().withAutoCommit(false).withProps(config.getProps())
.forTable(metaClient.getTableConfig().getTableName()).build();
try {
new UpgradeDowngrade(metaClient, updatedConfig, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance())
Expand All @@ -592,7 +592,7 @@ private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, Stri
}

private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) {
return HoodieWriteConfig.newBuilder().withPath(basePath)
return HoodieWriteConfig.newBuilder().withPath(basePath).withAutoCommit(false)
.withRollbackUsingMarkers(rollbackUsingMarkers)
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
HoodieFailedWritesCleaningPolicy.EAGER).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ public void testMetadataDelete() throws Exception {

List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
JavaRDD<HoodieRecord> writeRecords = context().getJavaSparkContext().parallelize(records, 1);
List<WriteStatus> result = client.upsert(writeRecords, newCommitTime).collect();
Assertions.assertNoWriteErrors(result);
JavaRDD<WriteStatus> result = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, result);
Assertions.assertNoWriteErrors(result.collect());
}

// verify that metadata partitions are filled in as part of table config.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ private void generateCommits() throws IOException {

// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withAutoCommit(false)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
Expand All @@ -192,7 +193,8 @@ private List<HoodieRecord> insert(JavaSparkContext jsc, SparkRDDWriteClient<Hood

List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
operateFunc(SparkRDDWriteClient::insert, client, writeRecords, newCommitTime);
JavaRDD<WriteStatus> result = operateFunc(SparkRDDWriteClient::insert, client, writeRecords, newCommitTime);
client.commit(newCommitTime, result);
return records;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
Expand All @@ -69,6 +70,7 @@
* A command use SparkLauncher need load jars under lib which generate during mvn package.
* Use integration test instead of unit test.
*/
@Disabled("siva-to-fix")
@SpringBootTest(properties = {"spring.shell.interactive.enabled=false", "spring.shell.command.script.enabled=false"})
public class ITTestCompactionCommand extends HoodieCLIIntegrationTestBase {

Expand Down Expand Up @@ -284,6 +286,7 @@ private void generateCommits() throws IOException {

// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withAutoCommit(true)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void testRollbackToSavepointWithMetadataTableEnable() throws Exception {
StoragePath metadataTableBasePath =
new StoragePath(HoodieTableMetadata.getMetadataTableBasePath(HoodieCLI.basePath));
// then bootstrap metadata table at instant 104
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath)
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withAutoCommit(false).withPath(HoodieCLI.basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc)).close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ private void generateCommits() throws IOException {

// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withAutoCommit(false)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withDeleteParallelism(2)
Expand All @@ -206,13 +207,14 @@ private void upsert(JavaSparkContext jsc, SparkRDDWriteClient<HoodieAvroPayload>
List<HoodieRecord> records, String newCommitTime) throws IOException {
client.startCommitWithTime(newCommitTime);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
operateFunc(SparkRDDWriteClient::upsert, client, writeRecords, newCommitTime);
JavaRDD<WriteStatus> result = operateFunc(SparkRDDWriteClient::upsert, client, writeRecords, newCommitTime);
client.commit(newCommitTime, result);
}

private void operateFunc(
private JavaRDD<WriteStatus> operateFunc(
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
SparkRDDWriteClient<HoodieAvroPayload> client, JavaRDD<HoodieRecord> writeRecords, String commitTime)
throws IOException {
writeFn.apply(client, writeRecords, commitTime);
return writeFn.apply(client, writeRecords, commitTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ public class HoodieWriteConfig extends HoodieConfig {

public static final ConfigProperty<String> AUTO_COMMIT_ENABLE = ConfigProperty
.key("hoodie.auto.commit")
.defaultValue("true")
.defaultValue("false")
.markAdvanced()
.withDocumentation("Controls whether a write operation should auto commit. This can be turned off to perform inspection"
+ " of the uncommitted write before deciding to commit.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand All @@ -46,6 +47,7 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;

Expand Down Expand Up @@ -91,7 +93,8 @@ public static void runCompaction(HoodieTable table, HoodieEngineContext context,
try (BaseHoodieWriteClient writeClient = upgradeDowngradeHelper.getWriteClient(compactionConfig, context)) {
Option<String> compactionInstantOpt = writeClient.scheduleCompaction(Option.empty());
if (compactionInstantOpt.isPresent()) {
writeClient.compact(compactionInstantOpt.get());
HoodieWriteMetadata result = writeClient.compact(compactionInstantOpt.get());
writeClient.commitCompaction(compactionInstantOpt.get(), (HoodieCommitMetadata) result.getCommitMetadata().get(), Option.empty());
}
}
}
Expand Down Expand Up @@ -203,7 +206,8 @@ static void rollbackFailedWritesAndCompact(HoodieTable table, HoodieEngineContex
if (shouldCompact) {
Option<String> compactionInstantOpt = writeClient.scheduleCompaction(Option.empty());
if (compactionInstantOpt.isPresent()) {
writeClient.compact(compactionInstantOpt.get());
HoodieWriteMetadata result = writeClient.compact(compactionInstantOpt.get());
writeClient.commitCompaction(compactionInstantOpt.get(), (HoodieCommitMetadata) result.getCommitMetadata().get(), Option.empty());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ void cleanRollsBackFailedWritesWithLazyPolicy(boolean rollbackOccurred) throws I
initMetaClient();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.build())
Expand Down Expand Up @@ -116,6 +117,7 @@ void cleanerPlanIsSkippedIfHasInflightClean() throws IOException {
initMetaClient();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.build())
Expand Down Expand Up @@ -151,6 +153,7 @@ void cleanerPlanIsCalledWithoutInflightClean(boolean generatesPlan) throws IOExc
initMetaClient();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.build())
Expand Down Expand Up @@ -199,6 +202,7 @@ void cleanerPlanIsCalledWithInflightCleanAndAllowMultipleCleans() throws IOExcep
initMetaClient();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.withMetricsConfig(HoodieMetricsConfig.newBuilder()
.on(true)
.withReporterType(MetricsReporterType.INMEMORY.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ void startCommitWillRollbackFailedWritesInEagerMode() throws IOException {
initMetaClient();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.build();
HoodieTable<String, String, String, String> table = mock(HoodieTable.class);
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class, RETURNS_DEEP_STUBS);
Expand All @@ -85,6 +86,7 @@ void rollbackDelegatesToTableServiceClient() throws IOException {
initMetaClient();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.build();
HoodieTable<String, String, String, String> table = mock(HoodieTable.class);
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
Expand All @@ -100,6 +102,7 @@ void testStartCommit() throws IOException {
initMetaClient();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withStorageType(FileSystemViewStorageType.MEMORY)
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void testLockAndUnlock(boolean multiWriter) {

private HoodieWriteConfig getMultiWriterWriteConfig() {
return HoodieWriteConfig.newBuilder()
.withAutoCommit(false)
.withPath(basePath)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
Expand All @@ -111,6 +112,7 @@ private HoodieWriteConfig getMultiWriterWriteConfig() {
private HoodieWriteConfig getSingleWriterWriteConfig() {
return HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.withLockConfig(HoodieLockConfig.newBuilder()
.withLockProvider(ZookeeperBasedLockProvider.class)
.withZkBasePath(ZK_BASE_PATH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ private void init(TestInfo testInfo) throws IOException {
private HoodieWriteConfig getWriteConfig(boolean useLockProviderWithRuntimeError) {
return HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ private HoodieWriteConfig makeConfig(boolean manuallySetPartitions) {
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
return HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.withIndexConfig(HoodieIndexConfig.newBuilder()
.fromProperties(props)
.withIndexType(HoodieIndex.IndexType.GLOBAL_SIMPLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ private HoodieWriteConfig makeConfig(boolean manuallySetPartitions) {
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
return HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.withIndexConfig(HoodieIndexConfig.newBuilder()
.fromProperties(props)
.withIndexType(HoodieIndex.IndexType.SIMPLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public void testCreateMetadataWriteConfigForCleaner() {
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.retainCommits(5).build())
.withAutoCommit(false)
.build();

HoodieWriteConfig metadataWriteConfig1 = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig1, HoodieFailedWritesCleaningPolicy.EAGER);
Expand All @@ -54,6 +55,7 @@ public void testCreateMetadataWriteConfigForCleaner() {
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.retainCommits(20).build())
.withAutoCommit(false)
.build();
HoodieWriteConfig metadataWriteConfig2 = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig2, HoodieFailedWritesCleaningPolicy.EAGER);
assertEquals(HoodieFailedWritesCleaningPolicy.EAGER, metadataWriteConfig2.getFailedWritesCleanPolicy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ void getIndexReturnsCachedInstance() throws IOException {
initMetaClient();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.build();
HoodieEngineContext context = mock(HoodieEngineContext.class);
HoodieTable hoodieTable = new TestBaseHoodieTable(writeConfig, context, metaClient);
Expand All @@ -63,6 +64,7 @@ void getStorageLayoutReturnsCachedInstance() throws IOException {
initMetaClient();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.build();
HoodieEngineContext context = mock(HoodieEngineContext.class);
HoodieTable hoodieTable = new TestBaseHoodieTable(writeConfig, context, metaClient);
Expand All @@ -76,6 +78,7 @@ void testGetEngineContext() throws IOException {
initMetaClient();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.build();
HoodieEngineContext context = mock(HoodieEngineContext.class);
HoodieTable hoodieTable = new TestBaseHoodieTable(writeConfig, context, metaClient);
Expand All @@ -93,6 +96,7 @@ void testRollbackInflightInstant() throws IOException {
initMetaClient();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withAutoCommit(false)
.build();
HoodieEngineContext context = mock(HoodieEngineContext.class);
HoodieTable hoodieTable =
Expand Down
Loading
Loading