Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
package org.apache.hudi.testutils;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
Expand All @@ -41,7 +43,6 @@
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -108,6 +109,17 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
@TempDir
protected java.nio.file.Path tempDir;

public static Map<String, String> getSparkSqlConf() {
Map<String, String> sqlConf = new HashMap<>();
sqlConf.put("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");

if (HoodieSparkUtils.gteqSpark3_2()) {
sqlConf.put("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog");
}

return sqlConf;
}

public String basePath() {
return tempDir.toAbsolutePath().toUri().toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.testutils.providers.SparkProvider;

import org.apache.spark.HoodieSparkKryoRegistrar$;
Expand Down Expand Up @@ -56,6 +57,11 @@ public SparkSession spark() {
return spark;
}

@Override
public SparkConf conf() {
return conf(SparkClientFunctionalTestHarness.getSparkSqlConf());
}

@Override
public SQLContext sqlContext() {
return sqlContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.TimestampBasedKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
Expand Down Expand Up @@ -58,6 +60,8 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
val verificationCol: String = "driver"
val updatedVerificationVal: String = "driver_update"

override def conf: SparkConf = conf(getSparkSqlConf)

@ParameterizedTest
@CsvSource(value = Array(
"true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.log4j.LogManager
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
Expand Down Expand Up @@ -57,6 +59,8 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
val verificationCol: String = "driver"
val updatedVerificationVal: String = "driver_update"

override def conf: SparkConf = conf(getSparkSqlConf)

@ParameterizedTest
@CsvSource(Array(
"true,",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ package org.apache.hudi.functional

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.api.Tag
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import scala.collection.JavaConverters._

@Tag("functional")
Expand All @@ -45,8 +49,11 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)

@Test
def testReadability(): Unit = {
override def conf: SparkConf = conf(getSparkSqlConf)

@ParameterizedTest
@ValueSource(ints = Array(1, 5))
def testReadability(compactNumDeltaCommits: Int): Unit = {
val dataGen = new HoodieTestDataGenerator()

val metadataOpts: Map[String, String] = Map(
Expand All @@ -55,7 +62,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
)

val combinedOpts: Map[String, String] = commonOpts ++ metadataOpts ++
Map(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1")
Map(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> compactNumDeltaCommits.toString)

// Insert records
val newRecords = dataGen.generateInserts("001", 100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenera
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD}
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.{Dataset, HoodieUnsafeUtils, Row, SaveMode}
Expand All @@ -55,6 +57,8 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getName
)

override def conf: SparkConf = conf(getSparkSqlConf)

@Disabled("Currently disabled b/c of the fallback to HadoopFsRelation")
@Test
def testBaseFileOnlyViewRelation(): Unit = {
Expand Down