Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 82 additions & 82 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,25 @@ jobs:
strategy:
matrix:
include:
- scalaProfile: "scala-2.11"
sparkProfile: "spark2.4"
sparkModules: "hudi-spark-datasource/hudi-spark2"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.0"
sparkModules: "hudi-spark-datasource/hudi-spark3.0.x"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.1"
sparkModules: "hudi-spark-datasource/hudi-spark3.1.x"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.2"
sparkModules: "hudi-spark-datasource/hudi-spark3.2.x"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.3"
sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
# - scalaProfile: "scala-2.11"
# sparkProfile: "spark2.4"
# sparkModules: "hudi-spark-datasource/hudi-spark2"
#
# - scalaProfile: "scala-2.12"
# sparkProfile: "spark3.0"
# sparkModules: "hudi-spark-datasource/hudi-spark3.0.x"
#
# - scalaProfile: "scala-2.12"
# sparkProfile: "spark3.1"
# sparkModules: "hudi-spark-datasource/hudi-spark3.1.x"
#
# - scalaProfile: "scala-2.12"
# sparkProfile: "spark3.2"
# sparkModules: "hudi-spark-datasource/hudi-spark3.2.x"
#
# - scalaProfile: "scala-2.12"
# sparkProfile: "spark3.3"
# sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.4"
Expand Down Expand Up @@ -117,9 +117,9 @@ jobs:
strategy:
matrix:
include:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.3"
sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
# - scalaProfile: "scala-2.12"
# sparkProfile: "spark3.3"
# sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.4"
sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"
Expand Down Expand Up @@ -166,45 +166,45 @@ jobs:
if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI
run:
mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS

test-flink:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- flinkProfile: "flink1.13"
- flinkProfile: "flink1.14"
- flinkProfile: "flink1.15"
- flinkProfile: "flink1.16"
- flinkProfile: "flink1.17"
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: '8'
distribution: 'adopt'
architecture: x64
- name: Build Project
env:
SCALA_PROFILE: 'scala-2.12'
FLINK_PROFILE: ${{ matrix.flinkProfile }}
run:
mvn clean install -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-examples/hudi-examples-flink -am -Davro.version=1.10.0 -DskipTests=true $MVN_ARGS
- name: Quickstart Test
env:
SCALA_PROFILE: 'scala-2.12'
FLINK_PROFILE: ${{ matrix.flinkProfile }}
run:
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-examples/hudi-examples-flink $MVN_ARGS
- name: Integration Test
env:
SCALA_PROFILE: 'scala-2.12'
FLINK_PROFILE: ${{ matrix.flinkProfile }}
if: ${{ endsWith(env.FLINK_PROFILE, '1.17') }}
run: |
mvn clean install -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink -am -Davro.version=1.10.0 -DskipTests=true $MVN_ARGS
mvn verify -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink $MVN_ARGS
#
# test-flink:
# runs-on: ubuntu-latest
# strategy:
# matrix:
# include:
# - flinkProfile: "flink1.13"
# - flinkProfile: "flink1.14"
# - flinkProfile: "flink1.15"
# - flinkProfile: "flink1.16"
# - flinkProfile: "flink1.17"
# steps:
# - uses: actions/checkout@v3
# - name: Set up JDK 8
# uses: actions/setup-java@v3
# with:
# java-version: '8'
# distribution: 'adopt'
# architecture: x64
# - name: Build Project
# env:
# SCALA_PROFILE: 'scala-2.12'
# FLINK_PROFILE: ${{ matrix.flinkProfile }}
# run:
# mvn clean install -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-examples/hudi-examples-flink -am -Davro.version=1.10.0 -DskipTests=true $MVN_ARGS
# - name: Quickstart Test
# env:
# SCALA_PROFILE: 'scala-2.12'
# FLINK_PROFILE: ${{ matrix.flinkProfile }}
# run:
# mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-examples/hudi-examples-flink $MVN_ARGS
# - name: Integration Test
# env:
# SCALA_PROFILE: 'scala-2.12'
# FLINK_PROFILE: ${{ matrix.flinkProfile }}
# if: ${{ endsWith(env.FLINK_PROFILE, '1.17') }}
# run: |
# mvn clean install -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink -am -Davro.version=1.10.0 -DskipTests=true $MVN_ARGS
# mvn verify -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink $MVN_ARGS

docker-java17-test:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -242,27 +242,27 @@ jobs:
- flinkProfile: 'flink1.17'
sparkProfile: 'spark3.4'
sparkRuntime: 'spark3.4.0'
- flinkProfile: 'flink1.17'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.2'
- flinkProfile: 'flink1.16'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.2'
- flinkProfile: 'flink1.15'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.1'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.2'
sparkRuntime: 'spark3.2.3'
- flinkProfile: 'flink1.13'
sparkProfile: 'spark3.1'
sparkRuntime: 'spark3.1.3'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.0'
sparkRuntime: 'spark3.0.2'
- flinkProfile: 'flink1.13'
sparkProfile: 'spark2.4'
sparkRuntime: 'spark2.4.8'
# - flinkProfile: 'flink1.17'
# sparkProfile: 'spark3.3'
# sparkRuntime: 'spark3.3.2'
# - flinkProfile: 'flink1.16'
# sparkProfile: 'spark3.3'
# sparkRuntime: 'spark3.3.2'
# - flinkProfile: 'flink1.15'
# sparkProfile: 'spark3.3'
# sparkRuntime: 'spark3.3.1'
# - flinkProfile: 'flink1.14'
# sparkProfile: 'spark3.2'
# sparkRuntime: 'spark3.2.3'
# - flinkProfile: 'flink1.13'
# sparkProfile: 'spark3.1'
# sparkRuntime: 'spark3.1.3'
# - flinkProfile: 'flink1.14'
# sparkProfile: 'spark3.0'
# sparkRuntime: 'spark3.0.2'
# - flinkProfile: 'flink1.13'
# sparkProfile: 'spark2.4'
# sparkRuntime: 'spark2.4.8'
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ private[hudi] trait SparkVersionsSupport {
def isSpark3_2: Boolean = getSparkVersion.startsWith("3.2")
def isSpark3_3: Boolean = getSparkVersion.startsWith("3.3")
def isSpark3_4: Boolean = getSparkVersion.startsWith("3.4")
def isSpark3_5: Boolean = getSparkVersion.startsWith("3.5")

def gteqSpark3_0: Boolean = getSparkVersion >= "3.0"
def gteqSpark3_1: Boolean = getSparkVersion >= "3.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ trait SparkAdapterSupport {
object SparkAdapterSupport {

lazy val sparkAdapter: SparkAdapter = {
val adapterClass = if (HoodieSparkUtils.isSpark3_4) {
val adapterClass = if (HoodieSparkUtils.isSpark3_5) {
"org.apache.spark.sql.adapter.Spark3_4Adapter"
} else if (HoodieSparkUtils.isSpark3_4) {
"org.apache.spark.sql.adapter.Spark3_4Adapter"
} else if (HoodieSparkUtils.isSpark3_3) {
"org.apache.spark.sql.adapter.Spark3_3Adapter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.types.StructType

Expand All @@ -31,7 +32,7 @@ object DataFrameUtil {
*/
def createFromInternalRows(sparkSession: SparkSession, schema:
StructType, rdd: RDD[InternalRow]): DataFrame = {
val logicalPlan = LogicalRDD(schema.toAttributes, rdd)(sparkSession)
val logicalPlan = LogicalRDD(DataTypeUtils.toAttributes(schema), rdd)(sparkSession)
Dataset.ofRows(sparkSession, logicalPlan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProject
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeEq, AttributeReference, Cast, Expression, Like, Literal, MutableProjection, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CreateStruct, Expression, GetStructField, Like, Literal, Projection, SubqueryExpression, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
Expand Down Expand Up @@ -269,7 +270,7 @@ object HoodieCatalystExpressionUtils extends SparkAdapterSupport {
}

private def generateUnsafeProjectionInternal(from: StructType, to: StructType): UnsafeProjection = {
val attrs = from.toAttributes
val attrs = DataTypeUtils.toAttributes(from)
val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
val targetExprs = to.fields.map(f => attrsMap(f.name))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.MutablePair
Expand Down Expand Up @@ -75,7 +76,7 @@ object HoodieUnsafeUtils {
* @param schema target [[DataFrame]]'s schema
*/
def createDataFrameFromRows(spark: SparkSession, rows: Seq[Row], schema: StructType): DataFrame =
Dataset.ofRows(spark, LocalRelation.fromExternalRows(schema.toAttributes, rows))
Dataset.ofRows(spark, LocalRelation.fromExternalRows(DataTypeUtils.toAttributes(schema), rows))

/**
* Creates [[DataFrame]] from the in-memory [[Seq]] of [[InternalRow]]s with provided [[schema]]
Expand All @@ -88,7 +89,7 @@ object HoodieUnsafeUtils {
* @param schema target [[DataFrame]]'s schema
*/
def createDataFrameFromInternalRows(spark: SparkSession, rows: Seq[InternalRow], schema: StructType): DataFrame =
Dataset.ofRows(spark, LocalRelation(schema.toAttributes, rows))
Dataset.ofRows(spark, LocalRelation(DataTypeUtils.toAttributes(schema), rows))


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.catalyst.types.DataTypeUtils;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
Expand Down Expand Up @@ -95,9 +95,9 @@ public class SparkDatasetTestUtils {
* @return the encoder thus generated.
*/
private static ExpressionEncoder getEncoder(StructType schema) {
List<Attribute> attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream()
List<Attribute> attributes = JavaConversions.asJavaCollection(DataTypeUtils.toAttributes(schema)).stream()
.map(Attribute::toAttribute).collect(Collectors.toList());
return RowEncoder.apply(schema)
return ExpressionEncoder.apply(schema)
.resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(),
SimpleAnalyzer$.MODULE$);
}
Expand Down
6 changes: 6 additions & 0 deletions hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
<version>${hive.storage.version}</version>
</dependency>

<!-- Kryo -->
<dependency>
<groupId>com.esotericsoftware</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
}

val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).map(_.fileStatus).toArray)

fsView.getPartitionPaths.asScala.flatMap { partitionPath =>
val relativePath = getRelativePartitionPath(basePath, partitionPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ case class HoodieFileIndex(spark: SparkSession,
val prunedPartitionsAndFilteredFileSlices = filterFileSlices(dataFilters, partitionFilters).map {
case (partitionOpt, fileSlices) =>
if (shouldEmbedFileSlices) {
val baseFileStatusesAndLogFileOnly: Seq[FileStatus] = fileSlices.map(slice => {
val baseFileStatusesAndLogFileOnly: Array[FileStatus] = fileSlices.map(slice => {
if (slice.getBaseFile.isPresent) {
slice.getBaseFile.get().getFileStatus
} else if (slice.getLogFiles.findAny().isPresent) {
slice.getLogFiles.findAny().get().getFileStatus
} else {
null
}
}).filter(slice => slice != null)
}).filter(slice => slice != null).toArray
val c = fileSlices.filter(f => f.getLogFiles.findAny().isPresent
|| (f.getBaseFile.isPresent && f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> f) }
Expand All @@ -168,7 +168,7 @@ case class HoodieFileIndex(spark: SparkSession,
}

} else {
val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
val allCandidateFiles: Array[FileStatus] = fileSlices.flatMap(fs => {
val baseFileStatusOpt = getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
val logFilesStatus = if (includeLogFiles) {
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile, FileStatus](lf => lf.getFileStatus))
Expand All @@ -178,7 +178,7 @@ case class HoodieFileIndex(spark: SparkSession,
val files = logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
baseFileStatusOpt.foreach(f => files.append(f))
files
})
}).toArray
PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), allCandidateFiles)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class HoodieInMemoryFileIndex(sparkSession: SparkSession,
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath)).toArray) :: Nil
} else {
prunePartitions(partitionFilters, partitionSpec()).map {
case PartitionPath(values, path) =>
Expand All @@ -62,7 +62,7 @@ class HoodieInMemoryFileIndex(sparkSession: SparkSession,
// Directory does not exist, or has no children files
Nil
}
PartitionDirectory(values, files)
PartitionDirectory(values, files.toArray)
}
}
logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
Expand Down
Loading