diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndex.java index 085c0d9b1cc68..45094546b5b65 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndex.java @@ -71,6 +71,30 @@ public static SparkHoodieIndex createIndex(HoodieWriteConfig config) { } } + /** + * Whether index is global or not. + * @param config HoodieWriteConfig to use. + * @return {@code true} if index is a global one. else {@code false}. + */ + public static boolean isGlobalIndex(HoodieWriteConfig config) { + switch (config.getIndexType()) { + case HBASE: + return true; + case INMEMORY: + return true; + case BLOOM: + return false; + case GLOBAL_BLOOM: + return true; + case SIMPLE: + return false; + case GLOBAL_SIMPLE: + return true; + default: + return createIndex(config).isGlobal(); + } + } + @Override @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) public abstract JavaRDD updateLocation(JavaRDD writeStatusRDD, diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java index c5ea8fe2651d2..6f8557f64a4ab 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java @@ -18,6 +18,12 @@ package org.apache.hudi; +import static org.apache.spark.sql.functions.callUDF; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.ReflectionUtils; @@ -35,16 +41,8 @@ import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; - -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - import scala.collection.JavaConverters; -import static org.apache.spark.sql.functions.callUDF; - /** * Helper class to assist in preparing {@link Dataset}s for bulk insert with datasource implementation. */ @@ -69,7 +67,8 @@ public class HoodieDatasetBulkInsertHelper { */ public static Dataset prepareHoodieDatasetForBulkInsert(SQLContext sqlContext, HoodieWriteConfig config, Dataset rows, String structName, String recordNamespace, - BulkInsertPartitioner> bulkInsertPartitionerRows) { + BulkInsertPartitioner> bulkInsertPartitionerRows, + boolean isGlobalIndex) { List originalFields = Arrays.stream(rows.schema().fields()).map(f -> new Column(f.name())).collect(Collectors.toList()); @@ -100,9 +99,15 @@ public static Dataset prepareHoodieDatasetForBulkInsert(SQLContext sqlConte functions.lit("").cast(DataTypes.StringType)) .withColumn(HoodieRecord.FILENAME_METADATA_FIELD, functions.lit("").cast(DataTypes.StringType)); + + Dataset dedupedDf = rowDatasetWithHoodieColumns; + if (config.shouldCombineBeforeInsert()) { + dedupedDf = SparkRowWriteHelper.newInstance().deduplicateRows(rowDatasetWithHoodieColumns, config.getPreCombineField(), isGlobalIndex); + } + List orderedFields = Stream.concat(HoodieRecord.HOODIE_META_COLUMNS.stream().map(Column::new), originalFields.stream()).collect(Collectors.toList()); - Dataset colOrderedDataset = rowDatasetWithHoodieColumns.select( + Dataset colOrderedDataset = dedupedDf.select( JavaConverters.collectionAsScalaIterableConverter(orderedFields).asScala().toSeq()); return bulkInsertPartitionerRows.repartitionRecords(colOrderedDataset, config.getBulkInsertShuffleParallelism()); diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/SparkRowWriteHelper.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/SparkRowWriteHelper.java new file mode 100644 index 0000000000000..6f5dd3713d74f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/SparkRowWriteHelper.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi; + +import org.apache.hudi.common.model.HoodieRecord; + +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.catalyst.expressions.Attribute; +import org.apache.spark.sql.types.StructType; + +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.JavaConverters; + +/** + * Helper class to assist in deduplicating Rows for BulkInsert with Rows. + */ +public class SparkRowWriteHelper { + + private SparkRowWriteHelper() { + } + + private static class WriteHelperHolder { + private static final SparkRowWriteHelper SPARK_WRITE_HELPER = new SparkRowWriteHelper(); + } + + public static SparkRowWriteHelper newInstance() { + return SparkRowWriteHelper.WriteHelperHolder.SPARK_WRITE_HELPER; + } + + public Dataset deduplicateRows(Dataset inputDf, String preCombineField, boolean isGlobalIndex) { + ExpressionEncoder encoder = getEncoder(inputDf.schema()); + + return inputDf.groupByKey( + (MapFunction) value -> + isGlobalIndex ? (value.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)) : + (value.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + value.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)), Encoders.STRING()) + .reduceGroups((ReduceFunction) (v1, v2) -> { + if (((Comparable) v1.getAs(preCombineField)).compareTo(((Comparable) v2.getAs(preCombineField))) >= 0) { + return v1; + } else { + return v2; + } + } + ).map((MapFunction, Row>) value -> value._2, encoder); + } + + private ExpressionEncoder getEncoder(StructType schema) { + List attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream() + .map(Attribute::toAttribute).collect(Collectors.toList()); + return RowEncoder.apply(schema) + .resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(), + SimpleAnalyzer$.MODULE$); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 3da3ddd1bebfd..76bc99b250bd6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -38,6 +38,7 @@ import org.apache.hudi.exception.HoodieException import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory import org.apache.hudi.hive.util.ConfigUtils import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} +import org.apache.hudi.index.SparkHoodieIndex import org.apache.hudi.internal.DataSourceInternalWriterHelper import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.sync.common.AbstractSyncTool @@ -345,8 +346,9 @@ object HoodieSparkSqlWriter { } val arePartitionRecordsSorted = bulkInsertPartitionerRows.arePartitionRecordsSorted(); parameters.updated(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, arePartitionRecordsSorted.toString) + val isGlobalIndex = SparkHoodieIndex.isGlobalIndex(writeConfig) val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace, - bulkInsertPartitionerRows) + bulkInsertPartitionerRows, isGlobalIndex) if (SPARK_VERSION.startsWith("2.")) { hoodieDF.write.format("org.apache.hudi.internal") .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java index d3f0f984a4ff1..e3ac0276ef227 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieDatasetBulkInsertHelper.java @@ -25,15 +25,32 @@ import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.avro.Schema; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.JavaConverters; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -45,13 +62,22 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase { private String schemaStr; - private Schema schema; + private transient Schema schema; private StructType structType; public TestHoodieDatasetBulkInsertHelper() throws IOException { init(); } + /** + * args for schema evolution test. + */ + private static Stream providePreCombineArgs() { + return Stream.of( + Arguments.of(false), + Arguments.of(true)); + } + private void init() throws IOException { schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); schema = DataSourceTestUtils.getStructTypeExampleSchema(); @@ -59,12 +85,12 @@ private void init() throws IOException { } @Test - public void testBulkInsertHelper() throws IOException { - HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet()).build(); + public void testBulkInsertHelper() { + HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet()).combineInput(false, false).build(); List rows = DataSourceTestUtils.generateRandomRows(10); Dataset dataset = sqlContext.createDataFrame(rows, structType); - Dataset result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", "testNamespace", - new NonSortPartitionerWithRows()); + Dataset result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", + "testNamespace", new NonSortPartitionerWithRows(), false); StructType resultSchema = result.schema(); assertEquals(result.count(), 10); @@ -74,6 +100,42 @@ public void testBulkInsertHelper() throws IOException { assertTrue(resultSchema.fieldIndex(entry.getKey()) == entry.getValue()); } + result.toJavaRDD().foreach(entry -> { + assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).equals(entry.getAs("_row_key"))); + assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).equals(entry.getAs("partition"))); + assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)).equals("")); + assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).equals("")); + assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)).equals("")); + }); + + Dataset trimmedOutput = result.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD) + .drop(HoodieRecord.FILENAME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD); + assertTrue(dataset.except(trimmedOutput).count() == 0); + } + + @ParameterizedTest + @MethodSource("providePreCombineArgs") + public void testBulkInsertPreCombine(boolean enablePreCombine) { + HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet()).combineInput(enablePreCombine, enablePreCombine) + .withPreCombineField("ts").build(); + List inserts = DataSourceTestUtils.generateRandomRows(10); + Dataset toUpdateDataset = sqlContext.createDataFrame(inserts.subList(0, 5), structType); + List updates = DataSourceTestUtils.updateRowsWithHigherTs(toUpdateDataset); + List rows = new ArrayList<>(); + rows.addAll(inserts); + rows.addAll(updates); + Dataset dataset = sqlContext.createDataFrame(rows, structType); + Dataset result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", + "testNamespace", new NonSortPartitionerWithRows(), false); + StructType resultSchema = result.schema(); + + assertEquals(result.count(), enablePreCombine ? 10 : 15); + assertEquals(resultSchema.fieldNames().length, structType.fieldNames().length + HoodieRecord.HOODIE_META_COLUMNS.size()); + + for (Map.Entry entry : HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.entrySet()) { + assertTrue(resultSchema.fieldIndex(entry.getKey()) == entry.getValue()); + } + int metadataRecordKeyIndex = resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD); int metadataParitionPathIndex = resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD); int metadataCommitTimeIndex = resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD); @@ -87,6 +149,30 @@ public void testBulkInsertHelper() throws IOException { assertTrue(entry.get(metadataCommitTimeIndex).equals("")); assertTrue(entry.get(metadataFilenameIndex).equals("")); }); + + Dataset trimmedOutput = result.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD) + .drop(HoodieRecord.FILENAME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD); + + // find resolved input snapshot + ExpressionEncoder encoder = getEncoder(dataset.schema()); + if (enablePreCombine) { + Dataset inputSnapshotDf = dataset.groupByKey( + (MapFunction) value -> value.getAs("partition") + "+" + value.getAs("_row_key"), Encoders.STRING()) + .reduceGroups((ReduceFunction) (v1, v2) -> { + long ts1 = v1.getAs("ts"); + long ts2 = v2.getAs("ts"); + if (ts1 >= ts2) { + return v1; + } else { + return v2; + } + }) + .map((MapFunction, Row>) value -> value._2, encoder); + + assertTrue(inputSnapshotDf.except(trimmedOutput).count() == 0); + } else { + assertTrue(dataset.except(trimmedOutput).count() == 0); + } } private Map getPropsAllSet() { @@ -120,7 +206,7 @@ public void testNoPropsSet() { Dataset dataset = sqlContext.createDataFrame(rows, structType); try { HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", - "testNamespace", new NonSortPartitionerWithRows()); + "testNamespace", new NonSortPartitionerWithRows(), false); fail("Should have thrown exception"); } catch (Exception e) { // ignore @@ -131,7 +217,7 @@ public void testNoPropsSet() { dataset = sqlContext.createDataFrame(rows, structType); try { HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", - "testNamespace", new NonSortPartitionerWithRows()); + "testNamespace", new NonSortPartitionerWithRows(), false); fail("Should have thrown exception"); } catch (Exception e) { // ignore @@ -142,7 +228,7 @@ public void testNoPropsSet() { dataset = sqlContext.createDataFrame(rows, structType); try { HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", - "testNamespace", new NonSortPartitionerWithRows()); + "testNamespace", new NonSortPartitionerWithRows(), false); fail("Should have thrown exception"); } catch (Exception e) { // ignore @@ -153,10 +239,18 @@ public void testNoPropsSet() { dataset = sqlContext.createDataFrame(rows, structType); try { HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", - "testNamespace", new NonSortPartitionerWithRows()); + "testNamespace", new NonSortPartitionerWithRows(), false); fail("Should have thrown exception"); } catch (Exception e) { // ignore } } + + private ExpressionEncoder getEncoder(StructType schema) { + List attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream() + .map(Attribute::toAttribute).collect(Collectors.toList()); + return RowEncoder.apply(schema) + .resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(), + SimpleAnalyzer$.MODULE$); + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java index ebf23242a5279..bdb19bd3d505f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.util.FileIOUtils; import org.apache.avro.Schema; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -75,6 +76,20 @@ public static List generateUpdates(List records, int count) { return toReturn; } + public static List getUniqueRows(List inserts, int count) { + List toReturn = new ArrayList<>(); + int soFar = 0; + int curIndex = 0; + while (soFar < count) { + if (!toReturn.contains(inserts.get(curIndex))) { + toReturn.add(inserts.get(curIndex)); + soFar++; + } + curIndex++; + } + return toReturn; + } + public static List generateRandomRowsEvolvedSchema(int count) { Random random = new Random(); List toReturn = new ArrayList<>(); @@ -89,4 +104,18 @@ public static List generateRandomRowsEvolvedSchema(int count) { } return toReturn; } + + public static List updateRowsWithHigherTs(Dataset inputDf) { + Random random = new Random(); + List input = inputDf.collectAsList(); + List rows = new ArrayList<>(); + for (Row row : input) { + Object[] values = new Object[3]; + values[0] = row.getAs("_row_key"); + values[1] = row.getAs("partition"); + values[2] = ((Long) row.getAs("ts")) + random.nextInt(1000); + rows.add(RowFactory.create(values)); + } + return rows; + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 268660375c7e4..d37dac444cf39 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -144,7 +144,13 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { // generate the inserts val schema = DataSourceTestUtils.getStructTypeExampleSchema val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) - val records = DataSourceTestUtils.generateRandomRows(1000) + val inserts = DataSourceTestUtils.generateRandomRows(1000) + + // add some updates so that preCombine kicks in + val toUpdateDataset = sqlContext.createDataFrame(DataSourceTestUtils.getUniqueRows(inserts, 40), structType) + val updates = DataSourceTestUtils.updateRowsWithHigherTs(toUpdateDataset) + val records = inserts.union(updates) + val recordsSeq = convertRowListToSeq(records) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) // write to Hudi @@ -161,6 +167,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { // fetch all records from parquet files generated from write to hudi val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) + val resultRows = actualDf.collectAsList() // remove metadata columns so that expected and actual DFs can be compared as is val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) @@ -448,9 +455,9 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { .foreach(tableType => { test("test schema evolution for " + tableType) { initSparkContext("test_schema_evolution") - val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") + val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_schema_evol") try { - val hoodieFooTableName = "hoodie_foo_tbl_" + tableType + val hoodieFooTableName = "hoodie_foo_tbl_schema_evolution_" + tableType //create a new table val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName,