Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
d1e9df5
Flipping default for auto commit to false to triage issues
nsivabalan Mar 13, 2025
b9a75db
Fixing tests
nsivabalan Mar 16, 2025
070d964
Fixing few more tests
nsivabalan Mar 17, 2025
334937e
Fixing all tests in hudi spark client module
nsivabalan Apr 7, 2025
cecfe42
Fixing few more tests
nsivabalan Apr 7, 2025
c34052c
Fixing tests apr 15
nsivabalan Apr 15, 2025
488b394
Disabling few tests
nsivabalan Apr 15, 2025
c39b5f5
Fixing test failures
nsivabalan Apr 15, 2025
e0938ab
Disabling few more failing tests
nsivabalan Apr 15, 2025
51edc58
Fixing TestHoodieFileSystemViews
nsivabalan Apr 16, 2025
3cae29f
Fixing all scala tests for auto commit
nsivabalan Apr 16, 2025
57da284
minor fix with TestMarkerBasedRollbackStrategy
nsivabalan Apr 16, 2025
ad61280
Disabling a flaky early conflict detection multi write test
nsivabalan Apr 16, 2025
a4a3f4c
Fixing tests in hudi-utilities phase1
nsivabalan Apr 16, 2025
2e3739e
Disabling tests from TestHoodieMultiTableServicesMain
nsivabalan Apr 17, 2025
bdfb215
Fixing tests in TestHoodieClientOnCopyOnWriteStorage
nsivabalan Apr 17, 2025
e5b8b80
Fixing few more tests with TestHoodieClientOnCopyOnWriteStorage
nsivabalan Apr 17, 2025
069bea1
Fixing few more tests
nsivabalan Apr 17, 2025
25bbaa2
Disabling a failing test
nsivabalan Apr 17, 2025
4126836
Adding optimized writes to MDT
nsivabalan Nov 12, 2024
54cb137
Fixing CreateHandle and MergeHandle to generate col stats and stitch …
nsivabalan Nov 12, 2024
2d1c732
Fixing compilation issue
nsivabalan Mar 7, 2025
5d67491
Fixing issue after rebase
nsivabalan Apr 21, 2025
9fb1b68
Fixing write dag to support both old and new ways
nsivabalan Apr 22, 2025
550dd63
Applying changes from lokesh's patch to fix auto commit flows
nsivabalan Apr 22, 2025
dc5f8b3
Fixing compilation issues after rebasing
nsivabalan Apr 22, 2025
a64dc02
Disabling SI by default
nsivabalan Apr 22, 2025
bcf6517
Fixing optimized write dag validated with functional sanity tests
nsivabalan Apr 23, 2025
efbf803
Fixing compilation failure
nsivabalan Apr 23, 2025
71a6179
Flipping auto commit fully
nsivabalan Apr 23, 2025
47d2e19
Fix compilation failures
lokeshj1703 Apr 23, 2025
e7c7698
Fix issues with metadata writer
lokeshj1703 Apr 23, 2025
1336c96
Fix test failure
lokeshj1703 Apr 23, 2025
552b743
Fixing row writer code paths to avoid optimized writes and fixing boo…
nsivabalan Apr 24, 2025
bb7c13a
Disable optimised writes by default
lokeshj1703 Apr 24, 2025
b6f9883
Enable column stats, partition stats and secondary index
lokeshj1703 Apr 24, 2025
7821f95
Fix payload deflation condition, add optimized write condition before…
lokeshj1703 Apr 24, 2025
10cccec
Fix column stats failure for v8 MOR
lokeshj1703 Apr 24, 2025
fa9afdd
Fixing java client tests
nsivabalan Apr 24, 2025
3131366
Fixing cols stats generation
nsivabalan Apr 24, 2025
d8755f7
Fixing RunCompactionProcedure
nsivabalan Apr 25, 2025
2da3fca
Fixing java tests TestHoodieJavaWriteClientInsert
nsivabalan Apr 25, 2025
ad6b496
Fixing integ tests compilation and TestHoodieSparkMergeOnReadTableClu…
nsivabalan Apr 25, 2025
ed9c7b4
Fixing test failures in module FT spark2
nsivabalan Apr 25, 2025
4d7a2bd
Fixing TestHoodieCompactor
nsivabalan Apr 25, 2025
9c3a1ed
Fixing TestAsyncCompaction
nsivabalan Apr 25, 2025
b873f58
Fixing TestMergeOnReadRollbackActionExecutor
nsivabalan Apr 25, 2025
35f5860
Fix test failures in UT client/spark-client
lokeshj1703 Apr 25, 2025
31369ec
Fix checkstyle
lokeshj1703 Apr 25, 2025
24dbcea
Fix compaction related tests, disable bootstrap tests
lokeshj1703 Apr 25, 2025
a5947cf
Fix checkstyle
lokeshj1703 Apr 25, 2025
f28c66f
Fix test failures in test-spark-scala-other-tests
lokeshj1703 Apr 25, 2025
8b1a653
Removing unintended file in flink test
nsivabalan Apr 25, 2025
e7a1b5f
Fixing shouldComplete with compaction
nsivabalan Apr 25, 2025
433e516
Fix error table flow tests
lokeshj1703 Apr 25, 2025
6e75435
Fixing TestSparkDataSourceDAGExecution
nsivabalan Apr 25, 2025
0099b64
Fix async compaction errors with deltastreamer
lokeshj1703 Apr 25, 2025
b7ab221
Fixing tests in Java UT spark-datasource functional package
nsivabalan Apr 25, 2025
1f25fdb
Fixing partition ttl
nsivabalan Apr 25, 2025
de6a0f7
Fixing deletePartition operation
nsivabalan Apr 26, 2025
0138973
Fixing flink compaction
nsivabalan Apr 26, 2025
4fc4167
Reverting disabling a TTL test in spark
nsivabalan Apr 26, 2025
a40a5a1
Fix some tests
lokeshj1703 Apr 26, 2025
0471261
Fixing java tests
nsivabalan Apr 27, 2025
7ac4fea
Fixing tests in TestHoodieFileSystemViews
nsivabalan Apr 27, 2025
3b91996
Fixing bootstrap operation
nsivabalan Apr 27, 2025
1be16e5
fixing flaky test in TestHoodieFileSystemViews
nsivabalan Apr 27, 2025
060a7e5
Fixing complete commit flows
nsivabalan Apr 28, 2025
e91e7bd
Fixing ITTestCompactionCommand
nsivabalan Apr 28, 2025
9f285aa
Fixing bootstrap tests
nsivabalan Apr 28, 2025
9812320
Remove doCommit argument
lokeshj1703 Apr 28, 2025
ecfe180
Removing disabled testS
nsivabalan Apr 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePa
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
HoodieWriteConfig updatedConfig = HoodieWriteConfig.newBuilder().withProps(config.getProps())
HoodieWriteConfig updatedConfig = HoodieWriteConfig.newBuilder().withAutoCommit(false).withProps(config.getProps())
.forTable(metaClient.getTableConfig().getTableName()).build();
try {
new UpgradeDowngrade(metaClient, updatedConfig, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance())
Expand All @@ -592,7 +592,7 @@ private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, Stri
}

private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) {
return HoodieWriteConfig.newBuilder().withPath(basePath)
return HoodieWriteConfig.newBuilder().withPath(basePath).withAutoCommit(false)
.withRollbackUsingMarkers(rollbackUsingMarkers)
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
HoodieFailedWritesCleaningPolicy.EAGER).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ public void testMetadataDelete() throws Exception {

List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
JavaRDD<HoodieRecord> writeRecords = context().getJavaSparkContext().parallelize(records, 1);
List<WriteStatus> result = client.upsert(writeRecords, newCommitTime).collect();
Assertions.assertNoWriteErrors(result);
JavaRDD<WriteStatus> result = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, result);
Assertions.assertNoWriteErrors(result.collect());
}

// verify that metadata partitions are filled in as part of table config.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ private void generateCommits() throws IOException {

// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withAutoCommit(false)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
Expand All @@ -192,7 +193,8 @@ private List<HoodieRecord> insert(JavaSparkContext jsc, SparkRDDWriteClient<Hood

List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
operateFunc(SparkRDDWriteClient::insert, client, writeRecords, newCommitTime);
JavaRDD<WriteStatus> result = operateFunc(SparkRDDWriteClient::insert, client, writeRecords, newCommitTime);
client.commit(newCommitTime, result);
return records;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ private void generateCommits() throws IOException {

// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withAutoCommit(false)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
Expand Down Expand Up @@ -331,13 +332,15 @@ private void delete(JavaSparkContext jsc, SparkRDDWriteClient<HoodieAvroPayload>
int numToDelete = records.size() / 2;
List<HoodieKey> toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1);
client.delete(deleteRecords, newCommitTime);
client.commit(newCommitTime, client.delete(deleteRecords, newCommitTime));
}

private JavaRDD<WriteStatus> operateFunc(
HoodieClientTestBase.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
SparkRDDWriteClient<HoodieAvroPayload> client, JavaRDD<HoodieRecord> writeRecords, String commitTime)
throws IOException {
return writeFn.apply(client, writeRecords, commitTime);
List<WriteStatus> writeStatuses = writeFn.apply(client, writeRecords, commitTime).collect();
client.commit(commitTime, jsc.parallelize(writeStatuses));
return jsc.parallelize(writeStatuses);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void testRollbackToSavepointWithMetadataTableEnable() throws Exception {
StoragePath metadataTableBasePath =
new StoragePath(HoodieTableMetadata.getMetadataTableBasePath(HoodieCLI.basePath));
// then bootstrap metadata table at instant 104
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath)
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withAutoCommit(false).withPath(HoodieCLI.basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc)).close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ private void generateCommits() throws IOException {

// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withAutoCommit(false)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withDeleteParallelism(2)
Expand All @@ -206,13 +207,14 @@ private void upsert(JavaSparkContext jsc, SparkRDDWriteClient<HoodieAvroPayload>
List<HoodieRecord> records, String newCommitTime) throws IOException {
client.startCommitWithTime(newCommitTime);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
operateFunc(SparkRDDWriteClient::upsert, client, writeRecords, newCommitTime);
JavaRDD<WriteStatus> result = operateFunc(SparkRDDWriteClient::upsert, client, writeRecords, newCommitTime);
client.commit(newCommitTime, result);
}

private void operateFunc(
private JavaRDD<WriteStatus> operateFunc(
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
SparkRDDWriteClient<HoodieAvroPayload> client, JavaRDD<HoodieRecord> writeRecords, String commitTime)
throws IOException {
writeFn.apply(client, writeRecords, commitTime);
return writeFn.apply(client, writeRecords, commitTime);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.callback.common;

import org.apache.hudi.client.LeanWriteStatus;

import java.util.List;

public interface WriteStatusHandlerCallback {
boolean processWriteStatuses(long totalRecords, long totalErroredRecords, List<LeanWriteStatus> leanWriteStatuses);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.table.timeline.TimeGenerator;
import org.apache.hudi.common.table.timeline.TimeGenerators;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
Expand All @@ -54,7 +55,9 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Abstract class taking care of holding common member variables (FileSystem, SparkContext, HoodieConfigs) Also, manages
Expand All @@ -75,13 +78,15 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
protected final TransactionManager txnManager;
private final TimeGenerator timeGenerator;


/**
* Timeline Server has the same lifetime as that of Client. Any operations done on the same timeline service will be
* able to take advantage of the cached file-system view. New completed actions will be synced automatically in an
* incremental fashion.
*/
private transient Option<EmbeddedTimelineService> timelineServer;
private final boolean shouldStopTimelineServer;
protected Map<String, Option<HoodieTableMetadataWriter>> metadataWriterMap = new ConcurrentHashMap<>();

protected BaseHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
this(context, clientConfig, Option.empty());
Expand Down Expand Up @@ -114,6 +119,20 @@ protected BaseHoodieClient(HoodieEngineContext context, HoodieWriteConfig client
@Override
public void close() {
stopEmbeddedServerView(true);
// close all metadata writer instances
if (metadataWriterMap != null) {
metadataWriterMap.entrySet().forEach(entry -> {
if (entry.getValue().isPresent()) {
try {
entry.getValue().get().close();
} catch (Exception e) {
throw new HoodieException("Failing to close metadata writer instance for " + entry.getKey(), e);
}
}
});
metadataWriterMap.clear();
metadataWriterMap = null;
}
this.context.setJobStatus("", "");
this.heartbeatClient.close();
this.txnManager.close();
Expand Down Expand Up @@ -262,6 +281,18 @@ protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieW
}
}

class GetMetadataWriterFunc implements Functions.Function2<String, HoodieTableMetaClient, Option<HoodieTableMetadataWriter>> {

@Override
public Option<HoodieTableMetadataWriter> apply(String triggeringInstantTimestamp, HoodieTableMetaClient metaClient) {
return getMetadataWriter(triggeringInstantTimestamp, metaClient);
}
}

Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp, HoodieTableMetaClient metaClient) {
throw new HoodieException("Each engine's write client is expected to implement this method");
}

/**
* Write the HoodieCommitMetadata to metadata table if available.
*
Expand Down
Loading
Loading