Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 12 additions & 81 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,61 +15,22 @@ on:
- '**.pdf'
- '**.png'
- '**.svg'
- '**.yaml'
- '**.yml'
- '.gitignore'
branches:
- master
- 'release-*'
env:
MVN_ARGS: -e -ntp -B -V -Pwarn-log -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.shade=warn -Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.dependency=warn
SPARK_COMMON_MODULES: hudi-spark-datasource/hudi-spark,hudi-spark-datasource/hudi-spark-common

jobs:
validate-source:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
uses: actions/setup-java@v2
with:
java-version: '8'
distribution: 'adopt'
architecture: x64
- name: Check Binary Files
run: ./scripts/release/validate_source_binary_files.sh
- name: Check Copyright
run: |
./scripts/release/create_source_directory.sh hudi-tmp-repo
cd hudi-tmp-repo
./scripts/release/validate_source_copyright.sh
- name: RAT check
run: ./scripts/release/validate_source_rat.sh

test-spark:
runs-on: ubuntu-latest
continue-on-error: true
strategy:
fail-fast: false
matrix:
include:
- scalaProfile: "scala-2.11"
sparkProfile: "spark2.4"
sparkModules: "hudi-spark-datasource/hudi-spark2"

- scalaProfile: "scala-2.12"
sparkProfile: "spark2.4"
sparkModules: "hudi-spark-datasource/hudi-spark2"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.1"
sparkModules: "hudi-spark-datasource/hudi-spark3.1.x"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.2"
sparkModules: "hudi-spark-datasource/hudi-spark3.2.x"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.3"
sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
sparkProfile: "spark3.4"

steps:
- uses: actions/checkout@v2
Expand All @@ -95,27 +56,24 @@ jobs:
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
if: ${{ !endsWith(env.SPARK_PROFILE, '2.4') }} # skip test spark 2.4 as it's covered by Azure CI
run:
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl hudi-common,hudi-spark-datasource/hudi-spark,hudi-spark-datasource/hudi-spark-common,hudi-spark-datasource/hudi-spark3.4.x $MVN_ARGS
continue-on-error: true
- name: FT - Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
if: ${{ !endsWith(env.SPARK_PROFILE, '2.4') }} # skip test spark 2.4 as it's covered by Azure CI
run:
mvn test -Pfunctional-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS

mvn test -Pfunctional-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl hudi-spark-datasource/hudi-spark,hudi-spark-datasource/hudi-spark-common,hudi-spark-datasource/hudi-spark3.4.x $MVN_ARGS
continue-on-error: true
test-flink:
runs-on: ubuntu-latest
continue-on-error: true
strategy:
matrix:
include:
- flinkProfile: "flink1.13"
- flinkProfile: "flink1.14"
- flinkProfile: "flink1.15"
- flinkProfile: "flink1.16"
steps:
- uses: actions/checkout@v2
Expand All @@ -140,21 +98,12 @@ jobs:

validate-bundles:
runs-on: ubuntu-latest
continue-on-error: true
strategy:
matrix:
include:
- flinkProfile: 'flink1.16'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.2'
- flinkProfile: 'flink1.15'
sparkProfile: 'spark3.3'
sparkRuntime: 'spark3.3.1'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.2'
sparkRuntime: 'spark3.2.3'
- flinkProfile: 'flink1.13'
sparkProfile: 'spark3.1'
sparkRuntime: 'spark3.1.3'
sparkProfile: 'spark3.4'
steps:
- uses: actions/checkout@v2
- name: Set up JDK 8
Expand All @@ -173,29 +122,11 @@ jobs:
# TODO remove the sudo below. It's a needed workaround as detailed in HUDI-5708.
sudo chown -R "$USER:$(id -g -n)" hudi-platform-service/hudi-metaserver/target/generated-sources
mvn clean package -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -DdeployArtifacts=true -DskipTests=true $MVN_ARGS -pl packaging/hudi-flink-bundle -am -Davro.version=1.10.0
- name: IT - Bundle Validation - OpenJDK 8
env:
FLINK_PROFILE: ${{ matrix.flinkProfile }}
SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
SCALA_PROFILE: 'scala-2.12'
run: |
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
./packaging/bundle-validation/ci_run.sh $HUDI_VERSION openjdk8
- name: IT - Bundle Validation - OpenJDK 11
env:
FLINK_PROFILE: ${{ matrix.flinkProfile }}
SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
SCALA_PROFILE: 'scala-2.12'
run: |
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
./packaging/bundle-validation/ci_run.sh $HUDI_VERSION openjdk11
- name: IT - Bundle Validation - OpenJDK 17
- name: IT - Bundle Validation
env:
FLINK_PROFILE: ${{ matrix.flinkProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
SCALA_PROFILE: 'scala-2.12'
if: ${{ endsWith(env.SPARK_PROFILE, '3.3') }} # Only Spark 3.3 supports Java 17 as of now
run: |
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
./packaging/bundle-validation/ci_run.sh $HUDI_VERSION openjdk17
./packaging/bundle-validation/ci_run.sh $HUDI_VERSION
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ protected HoodieFileReader newParquetFileReader(Configuration conf, Path path) {
conf.setIfUnset(SQLConf.PARQUET_BINARY_AS_STRING().key(), SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString());
conf.setIfUnset(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString());
conf.setIfUnset(SQLConf.CASE_SENSITIVE().key(), SQLConf.CASE_SENSITIVE().defaultValueString());
conf.setIfUnset(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().defaultValueString());
conf.setIfUnset(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().defaultValueString());
// Using string value of this conf to preserve compatibility across spark versions.
conf.setIfUnset("spark.sql.legacy.parquet.nanosAsLong", "false");
return new HoodieSparkParquetReader(conf, path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private[hudi] trait SparkVersionsSupport {
def isSpark3_1: Boolean = getSparkVersion.startsWith("3.1")
def isSpark3_2: Boolean = getSparkVersion.startsWith("3.2")
def isSpark3_3: Boolean = getSparkVersion.startsWith("3.3")
def isSpark3_4: Boolean = getSparkVersion.startsWith("3.4")

def gteqSpark3_0: Boolean = getSparkVersion >= "3.0"
def gteqSpark3_1: Boolean = getSparkVersion >= "3.1"
Expand All @@ -59,6 +60,7 @@ private[hudi] trait SparkVersionsSupport {
def gteqSpark3_2_2: Boolean = getSparkVersion >= "3.2.2"
def gteqSpark3_3: Boolean = getSparkVersion >= "3.3"
def gteqSpark3_3_2: Boolean = getSparkVersion >= "3.3.2"
def gteqSpark3_4: Boolean = getSparkVersion >= "3.4"
}

object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ trait SparkAdapterSupport {
object SparkAdapterSupport {

lazy val sparkAdapter: SparkAdapter = {
val adapterClass = if (HoodieSparkUtils.isSpark3_3) {
val adapterClass = if (HoodieSparkUtils.isSpark3_4) {
"org.apache.spark.sql.adapter.Spark3_4Adapter"
} else if (HoodieSparkUtils.isSpark3_3) {
"org.apache.spark.sql.adapter.Spark3_3Adapter"
} else if (HoodieSparkUtils.isSpark3_2) {
"org.apache.spark.sql.adapter.Spark3_2Adapter"
Expand Down
6 changes: 6 additions & 0 deletions hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -303,5 +303,11 @@
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
override def imbueConfigs(sqlContext: SQLContext): Unit = {
super.imbueConfigs(sqlContext)
// TODO Issue with setting this to true in spark 332
if (!HoodieSparkUtils.gteqSpark3_3_2) {
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true")
}
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true")
}

protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
Expand Down Expand Up @@ -203,7 +201,7 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
// NOTE: We have to specify table's base-path explicitly, since we're requesting Spark to read it as a
// list of globbed paths which complicates partitioning discovery for Spark.
// Please check [[PartitioningAwareFileIndex#basePaths]] comment for more details.
PartitioningAwareFileIndex.BASE_PATH_PARAM -> metaClient.getBasePathV2.toString
FileIndexOptions.BASE_PATH_PARAM -> metaClient.getBasePathV2.toString
),
partitionColumns = partitionColumns
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

BaseFileReader(
read = partitionedFile => {
val extension = FSUtils.getFileExtension(partitionedFile.filePath)
val extension = FSUtils.getFileExtension(partitionedFile.filePath.toString)
if (tableBaseFileFormat.getFileExtension.equals(extension)) {
read(partitionedFile)
} else {
Expand Down Expand Up @@ -715,7 +715,7 @@ object HoodieBaseRelation extends SparkAdapterSupport {

partitionedFile => {
val hadoopConf = hadoopConfBroadcast.value.get()
val reader = new HoodieAvroHFileReader(hadoopConf, new Path(partitionedFile.filePath),
val reader = new HoodieAvroHFileReader(hadoopConf, partitionedFile.filePath.toPath,
new CacheConfig(hadoopConf))

val requiredRowSchema = requiredDataSchema.structTypeSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema,
import org.apache.hudi.HoodieBootstrapRelation.validate
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.spark.paths.SparkPath
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -75,12 +76,12 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext,
if (baseFile.getBootstrapBaseFile.isPresent) {
val partitionValues =
getPartitionColumnsAsInternalRowInternal(baseFile.getFileStatus, extractPartitionValuesFromPartitionPath = isPartitioned)
val dataFile = PartitionedFile(partitionValues, baseFile.getBootstrapBaseFile.get().getPath, 0, baseFile.getBootstrapBaseFile.get().getFileLen)
val skeletonFile = Option(PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen))
val dataFile = PartitionedFile(partitionValues, SparkPath.fromPathString(baseFile.getBootstrapBaseFile.get().getPath), 0, baseFile.getBootstrapBaseFile.get().getFileLen)
val skeletonFile = Option(PartitionedFile(InternalRow.empty, SparkPath.fromPathString(baseFile.getPath), 0, baseFile.getFileLen))

HoodieBootstrapSplit(dataFile, skeletonFile)
} else {
val dataFile = PartitionedFile(getPartitionColumnsAsInternalRow(baseFile.getFileStatus), baseFile.getPath, 0, baseFile.getFileLen)
val dataFile = PartitionedFile(getPartitionColumnsAsInternalRow(baseFile.getFileStatus), SparkPath.fromPathString(baseFile.getPath), 0, baseFile.getFileLen)
HoodieBootstrapSplit(dataFile)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.utils.SerDeHelper
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.HoodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -85,7 +86,7 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size)
PartitionedFile(partitionValues, SparkPath.fromPath(filePath), offset, size)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ object LogFileIterator {
// Determine partition path as an immediate parent folder of either
// - The base file
// - Some log file
split.dataFile.map(baseFile => new Path(baseFile.filePath))
split.dataFile.map(baseFile => baseFile.filePath.toPath)
.getOrElse(split.logFiles.head.getPath)
.getParent
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.MergeOnReadSnapshotRelation.{getFilePath, isProjectionCompatible}
import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.paths.SparkPath
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -233,8 +234,7 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext,
val logFiles = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList

val partitionedBaseFile = baseFile.map { file =>
val filePath = getFilePath(file.getFileStatus.getPath)
PartitionedFile(getPartitionColumnsAsInternalRow(file.getFileStatus), filePath, 0, file.getFileLen)
PartitionedFile(getPartitionColumnsAsInternalRow(file.getFileStatus), SparkPath.fromPath(file.getFileStatus.getPath), 0, file.getFileLen)
}

HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles)
Expand All @@ -258,21 +258,4 @@ object MergeOnReadSnapshotRelation {

def isProjectionCompatible(tableState: HoodieTableState): Boolean =
projectionCompatiblePayloadClasses.contains(tableState.recordPayloadClassName)

def getFilePath(path: Path): String = {
// Here we use the Path#toUri to encode the path string, as there is a decode in
// ParquetFileFormat#buildReaderWithPartitionValues in the spark project when read the table
// .So we should encode the file path here. Otherwise, there is a FileNotException throw
// out.
// For example, If the "pt" is the partition path field, and "pt" = "2021/02/02", If
// we enable the URL_ENCODE_PARTITIONING and write data to hudi table.The data path
// in the table will just like "/basePath/2021%2F02%2F02/xxxx.parquet". When we read
// data from the table, if there are no encode for the file path,
// ParquetFileFormat#buildReaderWithPartitionValues will decode it to
// "/basePath/2021/02/02/xxxx.parquet" witch will result to a FileNotException.
// See FileSourceScanExec#createBucketedReadRDD in spark project which do the same thing
// when create PartitionedFile.
path.toUri.toString
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.{AvroConversionUtils, AvroProjection, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport}
import org.apache.spark.paths.SparkPath
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -419,7 +420,7 @@ class HoodieCDCRDD(
assert(currentCDCFileSplit.getCdcFiles != null && currentCDCFileSplit.getCdcFiles.size() == 1)
val absCDCPath = new Path(basePath, currentCDCFileSplit.getCdcFiles.get(0))
val fileStatus = fs.getFileStatus(absCDCPath)
val pf = PartitionedFile(InternalRow.empty, absCDCPath.toUri.toString, 0, fileStatus.getLen)
val pf = PartitionedFile(InternalRow.empty, SparkPath.fromPath(absCDCPath), 0, fileStatus.getLen)
recordIter = parquetReader(pf)
case BASE_FILE_DELETE =>
assert(currentCDCFileSplit.getBeforeFileSlice.isPresent)
Expand Down Expand Up @@ -525,7 +526,7 @@ class HoodieCDCRDD(
val baseFileStatus = fs.getFileStatus(new Path(fileSlice.getBaseFile.get().getPath))
val basePartitionedFile = PartitionedFile(
InternalRow.empty,
pathToString(baseFileStatus.getPath),
SparkPath.fromPath(baseFileStatus.getPath),
0,
baseFileStatus.getLen
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat.FILE_FORMAT_ID
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{AtomicType, StructType}


class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport {
Expand All @@ -34,6 +34,11 @@ class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport

override def toString: String = "Hoodie-Parquet"

override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
conf.parquetVectorizedReaderEnabled && schema.forall(_.dataType.isInstanceOf[AtomicType])
}

override def buildReaderWithPartitionValues(sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ case class AlterHoodieTableAddColumnsCommand(

SchemaUtils.checkColumnNameDuplication(
newSqlDataSchema.map(_.name),
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)

sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlDataSchema)
Expand Down
Loading