Skip to content

Commit bc4966e

Browse files
authored
[HUDI-2484] Fix hive sync mode setting in Deltastreamer (apache#3712)
1 parent 31a301f commit bc4966e

File tree

2 files changed

+38
-0
lines changed

2 files changed

+38
-0
lines changed

hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java

+3
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,9 @@ public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String b
296296
SlashEncodedDayPartitionValueExtractor.class.getName());
297297
hiveSyncConfig.useJdbc = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_USE_JDBC().key(),
298298
DataSourceWriteOptions.HIVE_USE_JDBC().defaultValue()));
299+
if (props.containsKey(DataSourceWriteOptions.HIVE_SYNC_MODE().key())) {
300+
hiveSyncConfig.syncMode = props.getString(DataSourceWriteOptions.HIVE_SYNC_MODE().key());
301+
}
299302
hiveSyncConfig.autoCreateDatabase = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().key(),
300303
DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().defaultValue()));
301304
hiveSyncConfig.ignoreExceptions = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(),

hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java

+35
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020

2121
import org.apache.hudi.avro.HoodieAvroUtils;
2222
import org.apache.hudi.client.SparkRDDWriteClient;
23+
import org.apache.hudi.common.config.TypedProperties;
2324
import org.apache.hudi.common.model.HoodieRecord;
2425
import org.apache.hudi.common.model.HoodieRecordPayload;
2526
import org.apache.hudi.common.model.WriteOperationType;
2627
import org.apache.hudi.common.util.Option;
2728
import org.apache.hudi.config.HoodieWriteConfig;
2829
import org.apache.hudi.exception.HoodieException;
2930
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
31+
import org.apache.hudi.hive.HiveSyncConfig;
3032
import org.apache.hudi.table.BulkInsertPartitioner;
3133

3234
import org.apache.avro.Conversions;
@@ -41,6 +43,8 @@
4143
import org.junit.jupiter.api.BeforeEach;
4244
import org.junit.jupiter.api.Test;
4345
import org.junit.jupiter.api.extension.ExtendWith;
46+
import org.junit.jupiter.params.ParameterizedTest;
47+
import org.junit.jupiter.params.provider.ValueSource;
4448
import org.mockito.ArgumentCaptor;
4549
import org.mockito.Captor;
4650
import org.mockito.Mock;
@@ -49,13 +53,18 @@
4953
import java.math.BigDecimal;
5054
import java.time.LocalDate;
5155

56+
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
57+
import static org.apache.hudi.hive.ddl.HiveSyncMode.HMS;
5258
import static org.hamcrest.CoreMatchers.containsString;
5359
import static org.hamcrest.CoreMatchers.equalTo;
5460
import static org.hamcrest.CoreMatchers.instanceOf;
5561
import static org.hamcrest.CoreMatchers.is;
5662
import static org.hamcrest.MatcherAssert.assertThat;
5763
import static org.junit.jupiter.api.Assertions.assertEquals;
64+
import static org.junit.jupiter.api.Assertions.assertFalse;
65+
import static org.junit.jupiter.api.Assertions.assertNull;
5866
import static org.junit.jupiter.api.Assertions.assertThrows;
67+
import static org.junit.jupiter.api.Assertions.assertTrue;
5968
import static org.mockito.ArgumentMatchers.any;
6069
import static org.mockito.ArgumentMatchers.anyString;
6170
import static org.mockito.Mockito.times;
@@ -65,6 +74,9 @@
6574
@ExtendWith(MockitoExtension.class)
6675
public class TestDataSourceUtils {
6776

77+
private static final String HIVE_DATABASE = "testdb1";
78+
private static final String HIVE_TABLE = "hive_trips";
79+
6880
@Mock
6981
private SparkRDDWriteClient hoodieWriteClient;
7082

@@ -199,6 +211,29 @@ public void testCreateRDDCustomColumnsSortPartitionerWithValidPartitioner() thro
199211
assertThat(partitioner.isPresent(), is(true));
200212
}
201213

214+
@ParameterizedTest
215+
@ValueSource(booleans = {true, false})
216+
public void testBuildHiveSyncConfig(boolean useSyncMode) {
217+
TypedProperties props = new TypedProperties();
218+
if (useSyncMode) {
219+
props.setProperty(DataSourceWriteOptions.HIVE_SYNC_MODE().key(), HMS.name());
220+
props.setProperty(DataSourceWriteOptions.HIVE_USE_JDBC().key(), String.valueOf(false));
221+
}
222+
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE().key(), HIVE_DATABASE);
223+
props.setProperty(DataSourceWriteOptions.HIVE_TABLE().key(), HIVE_TABLE);
224+
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, config.getBasePath(), PARQUET.name());
225+
226+
if (useSyncMode) {
227+
assertFalse(hiveSyncConfig.useJdbc);
228+
assertEquals(HMS.name(), hiveSyncConfig.syncMode);
229+
} else {
230+
assertTrue(hiveSyncConfig.useJdbc);
231+
assertNull(hiveSyncConfig.syncMode);
232+
}
233+
assertEquals(HIVE_DATABASE, hiveSyncConfig.databaseName);
234+
assertEquals(HIVE_TABLE, hiveSyncConfig.tableName);
235+
}
236+
202237
private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) {
203238
config = HoodieWriteConfig.newBuilder().withPath(config.getBasePath())
204239
.withUserDefinedBulkInsertPartitionerClass(partitionerClassName)

0 commit comments

Comments
 (0)