Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.utilities.deltastreamer;

import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieWriterUtils;
import org.apache.hudi.async.AsyncClusteringService;
import org.apache.hudi.async.AsyncCompactService;
import org.apache.hudi.async.HoodieAsyncService;
Expand Down Expand Up @@ -76,6 +77,7 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -651,6 +653,9 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Config
+ cfg.baseFileFormat);
cfg.baseFileFormat = baseFileFormat;
this.cfg.baseFileFormat = baseFileFormat;
Map<String,String> propsToValidate = new HashMap<>();
properties.get().forEach((k,v) -> propsToValidate.put(k.toString(),v.toString()));
HoodieWriterUtils.validateTableConfig(this.sparkSession, org.apache.hudi.HoodieConversionUtils.mapAsScalaImmutableMap(propsToValidate), meta.getTableConfig());
} else {
tableType = HoodieTableType.valueOf(cfg.tableType);
if (cfg.baseFileFormat == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,25 +612,16 @@ public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception {

// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);

// No new data => no commits.
cfg.sourceLimit = 0;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);

// upsert() #1
cfg.sourceLimit = 2000;
cfg.operation = WriteOperationType.UPSERT;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
syncAndAssertRecordCount(cfg,1950, tableBasePath, "00001", 2);
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());

Expand Down Expand Up @@ -664,6 +655,43 @@ public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception {
assertTrue(fieldNames.containsAll(expectedFieldNames));
}

@Test
public void testModifiedTableConfigs() throws Exception {
String tableBasePath = dfsBasePath + "/test_table_modified_configs";

// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);

// No new data => no commits.
cfg.sourceLimit = 0;
syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);

// add disallowed config update to recordkey field. An exception should be thrown
cfg.sourceLimit = 2000;
cfg.operation = WriteOperationType.UPSERT;
cfg.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() + "=differentval");
assertThrows(HoodieException.class, () -> syncAndAssertRecordCount(cfg,1000,tableBasePath,"00000",1));
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
assertEquals(1000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());


//perform the upsert and now with the original config, the commit should go through
HoodieDeltaStreamer.Config newCfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
newCfg.sourceLimit = 2000;
newCfg.operation = WriteOperationType.UPSERT;
syncAndAssertRecordCount(newCfg, 1950, tableBasePath, "00001", 2);
List<Row> counts2 = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
assertEquals(1950, counts2.stream().mapToLong(entry -> entry.getLong(1)).sum());
}

private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, Integer expected, String tableBasePath, String metadata, Integer totalCommits) throws Exception {
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(expected, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(expected, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata(metadata, tableBasePath, dfs, totalCommits);
}

@ParameterizedTest
@MethodSource("schemaEvolArgs")
public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, boolean useSchemaPostProcessor) throws Exception {
Expand Down Expand Up @@ -1419,7 +1447,7 @@ public void testNullSchemaProvider() throws Exception {

@Test
public void testPayloadClassUpdate() throws Exception {
String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
String dataSetBasePath = dfsBasePath + "/test_dataset_mor_payload_class_update";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false,
true, false, null, "MERGE_ON_READ");
Expand Down Expand Up @@ -1592,6 +1620,8 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans
populateCommonProps(parquetProps, dfsBasePath);
}

parquetProps.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());

parquetProps.setProperty("include", "base.properties");
parquetProps.setProperty("hoodie.embed.timeline.server", "false");
parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
Expand Down Expand Up @@ -2142,7 +2172,7 @@ public void testDeletePartitions() throws Exception {
// No records should match the HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
}

void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception {
// Initial insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
Expand Down