Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
d05e966
Support Spark 3.5.0
CTTY Aug 24, 2023
f0eaf5d
Adjust pom version
CTTY Sep 14, 2023
eef5eaf
see all test failures
CTTY Sep 14, 2023
c8841b5
Fix insert into statement related fix
CTTY Sep 15, 2023
2c0b0ad
add hive-storage-api as provided in hudi-spark
CTTY Sep 15, 2023
1dfc1d5
Minor compilation fix
CTTY Sep 15, 2023
ae78542
minor
CTTY Sep 15, 2023
852fd5a
Fix case sensitive test for Spark35
CTTY Sep 16, 2023
703571b
Spark backward compatibility
CTTY Sep 18, 2023
fb7f79e
minor fix
CTTY Sep 18, 2023
5a95586
minor
CTTY Sep 18, 2023
68ee51f
More tests fix
CTTY Sep 18, 2023
deed85d
Fix all UTs for Spark 3.5.0
CTTY Sep 18, 2023
5333aff
checkstyle fix
CTTY Sep 19, 2023
f154bb0
InsertIntoStatement backward compatibility
CTTY Sep 19, 2023
c6abfc8
Split DataSourceUtils for 3.2-3.5
CTTY Sep 19, 2023
36af850
more fix for backward compatibility
CTTY Sep 19, 2023
5236001
minor fix
CTTY Sep 19, 2023
1e08edd
minor fix
CTTY Sep 19, 2023
447d0eb
use getEncoder
CTTY Sep 19, 2023
7a6c589
fix import
CTTY Sep 19, 2023
11b44b1
minor
CTTY Sep 19, 2023
d873449
minor
CTTY Sep 19, 2023
d93392a
Fix TestReflectUtil arg numbers
CTTY Sep 19, 2023
31b8d8e
minor
CTTY Sep 19, 2023
60f6d25
UT passed for all Spark versions, re-enable flink tests and fixing bu…
CTTY Sep 19, 2023
b6bafdb
Fixed all Spark35 UT, need to fix classpath for older Spark
CTTY Sep 19, 2023
9894447
make hive-storage-api a 3.5 profile dependency
CTTY Sep 19, 2023
4410d32
trigger CI
CTTY Sep 20, 2023
5fa1611
Revert "trigger CI"
CTTY Sep 21, 2023
4bdfb34
Skipping Spark 3.4 CI due to GHCI SSL issue
CTTY Sep 21, 2023
255a126
remove hive-storage-api from hudi-spark pom
CTTY Sep 22, 2023
0db8e2a
Fixed all GHCI UT, fixing Azure CI
CTTY Sep 22, 2023
e0168f4
Split test classes and move classes to each Spark submodule to avoid …
CTTY Sep 23, 2023
bc54076
Add exclusions for spark-core back, add parquet-avro as test dep for …
CTTY Sep 23, 2023
3dd01ff
Add teardown for TestHoodieFileGroupReaderOnSpark
CTTY Oct 27, 2023
30825c1
Port changes from HUDI-6963 to Spark 3.5 and rebase
CTTY Oct 28, 2023
b97bae1
Fix SqlBase for Spark 3.5
CTTY Oct 31, 2023
056aaf2
Fix error msg for TestHoodieRowCreateHandle#testGlobalFailure
CTTY Oct 31, 2023
f3e3d55
Add script to build Spark 35 docker image, not sure why GHCI was canc…
CTTY Oct 31, 2023
e95f7a0
minor, fixing docker image
CTTY Oct 31, 2023
89ed60b
Use personal image and skip bundle validation for non-spark35 tempora…
CTTY Nov 1, 2023
1729a37
Include protobuf in utilities bundle to resolve class not found issue
CTTY Nov 1, 2023
733c9a1
Include protobuf in utilities and utilities-slim bundle and relocate it
CTTY Nov 2, 2023
584967f
Update comment
CTTY Nov 3, 2023
96032c3
Fix build after rebasing master
yihua Nov 8, 2023
3e1289b
Fix bundle validation for Spark 3.5 and GH CI script
yihua Nov 15, 2023
1efbe4e
Move util methods from SparkAdapter to corresponding util classes
yihua Nov 15, 2023
af28064
Address nits and add docs
yihua Nov 15, 2023
6e762ec
Fix pom
yihua Nov 15, 2023
a83f451
Fix build
yihua Nov 15, 2023
8ac9c76
Fix nits
yihua Nov 16, 2023
017a375
Change Spark 3 profile to use Spark 3.5
yihua Nov 16, 2023
afe70da
Shade protobuf in utilities-slim bundle instead
yihua Nov 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ jobs:
sparkProfile: "spark3.4"
sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"

steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
Expand Down Expand Up @@ -163,6 +167,9 @@ jobs:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.4"
sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"

steps:
- uses: actions/checkout@v3
Expand Down Expand Up @@ -255,6 +262,9 @@ jobs:
strategy:
matrix:
include:
- flinkProfile: 'flink1.18'
sparkProfile: 'spark3.5'
sparkRuntime: 'spark3.5.0'
- flinkProfile: 'flink1.18'
sparkProfile: 'spark3.4'
sparkRuntime: 'spark3.4.0'
Expand Down Expand Up @@ -284,6 +294,9 @@ jobs:
strategy:
matrix:
include:
- flinkProfile: 'flink1.18'
sparkProfile: 'spark3.5'
sparkRuntime: 'spark3.5.0'
- flinkProfile: 'flink1.18'
sparkProfile: 'spark3.4'
sparkRuntime: 'spark3.4.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ private[hudi] trait SparkVersionsSupport {
def isSpark3_2: Boolean = getSparkVersion.startsWith("3.2")
def isSpark3_3: Boolean = getSparkVersion.startsWith("3.3")
def isSpark3_4: Boolean = getSparkVersion.startsWith("3.4")
def isSpark3_5: Boolean = getSparkVersion.startsWith("3.5")

def gteqSpark3_0: Boolean = getSparkVersion >= "3.0"
def gteqSpark3_1: Boolean = getSparkVersion >= "3.1"
Expand All @@ -61,6 +62,7 @@ private[hudi] trait SparkVersionsSupport {
def gteqSpark3_3: Boolean = getSparkVersion >= "3.3"
def gteqSpark3_3_2: Boolean = getSparkVersion >= "3.3.2"
def gteqSpark3_4: Boolean = getSparkVersion >= "3.4"
def gteqSpark3_5: Boolean = getSparkVersion >= "3.5"
}

object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ trait SparkAdapterSupport {
object SparkAdapterSupport {

lazy val sparkAdapter: SparkAdapter = {
val adapterClass = if (HoodieSparkUtils.isSpark3_4) {
val adapterClass = if (HoodieSparkUtils.isSpark3_5) {
"org.apache.spark.sql.adapter.Spark3_5Adapter"
} else if (HoodieSparkUtils.isSpark3_4) {
"org.apache.spark.sql.adapter.Spark3_4Adapter"
} else if (HoodieSparkUtils.isSpark3_3) {
"org.apache.spark.sql.adapter.Spark3_3Adapter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.spark.sql

import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.LogicalRDD
Expand All @@ -31,7 +32,8 @@ object DataFrameUtil {
*/
def createFromInternalRows(sparkSession: SparkSession, schema:
StructType, rdd: RDD[InternalRow]): DataFrame = {
val logicalPlan = LogicalRDD(schema.toAttributes, rdd)(sparkSession)
val logicalPlan = LogicalRDD(
SparkAdapterSupport.sparkAdapter.getSchemaUtils.toAttributes(schema), rdd)(sparkSession)
Dataset.ofRows(sparkSession, logicalPlan)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@
package org.apache.spark.sql

import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.SparkAdapterSupport.sparkAdapter
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction}
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, GenerateUnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeEq, AttributeReference, Cast, Expression, Like, Literal, MutableProjection, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CreateStruct, Expression, GetStructField, Like, Literal, Projection, SubqueryExpression, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeEq, AttributeReference, AttributeSet, Cast, Expression, Like, Literal, SubqueryExpression, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}

trait HoodieCatalystExpressionUtils {

/**
* SPARK-44531 Encoder inference moved elsewhere in Spark 3.5.0
* Mainly used for unit tests
*/
def getEncoder(schema: StructType): ExpressionEncoder[Row]

/**
* Returns a filter that its reference is a subset of `outputSet` and it contains the maximum
* constraints from `condition`. This is used for predicate push-down
Expand Down Expand Up @@ -269,7 +271,7 @@ object HoodieCatalystExpressionUtils extends SparkAdapterSupport {
}

private def generateUnsafeProjectionInternal(from: StructType, to: StructType): UnsafeProjection = {
val attrs = from.toAttributes
val attrs = sparkAdapter.getSchemaUtils.toAttributes(from)
val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
val targetExprs = to.fields.map(f => attrsMap(f.name))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.types.StructType

/**
* Utils on schema, which have different implementation across Spark versions.
*/
Expand All @@ -34,4 +37,10 @@ trait HoodieSchemaUtils {
def checkColumnNameDuplication(columnNames: Seq[String],
colType: String,
caseSensitiveAnalysis: Boolean): Unit

/**
* SPARK-44353 StructType#toAttributes was removed in Spark 3.5.0
* Use DataTypeUtils#toAttributes for Spark 3.5+
*/
def toAttributes(struct: StructType): Seq[Attribute]
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.spark.sql

import org.apache.hudi.HoodieUnsafeRDD
import org.apache.hudi.{HoodieUnsafeRDD, SparkAdapterSupport}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
Expand Down Expand Up @@ -68,14 +68,15 @@ object HoodieUnsafeUtils {
* Creates [[DataFrame]] from the in-memory [[Seq]] of [[Row]]s with provided [[schema]]
*
* NOTE: [[DataFrame]] is based on [[LocalRelation]], entailing that most computations with it
* will be executed by Spark locally
* will be executed by Spark locally
*
* @param spark spark's session
* @param rows collection of rows to base [[DataFrame]] on
* @param spark spark's session
* @param rows collection of rows to base [[DataFrame]] on
* @param schema target [[DataFrame]]'s schema
*/
def createDataFrameFromRows(spark: SparkSession, rows: Seq[Row], schema: StructType): DataFrame =
Dataset.ofRows(spark, LocalRelation.fromExternalRows(schema.toAttributes, rows))
Dataset.ofRows(spark, LocalRelation.fromExternalRows(
SparkAdapterSupport.sparkAdapter.getSchemaUtils.toAttributes(schema), rows))

/**
* Creates [[DataFrame]] from the in-memory [[Seq]] of [[InternalRow]]s with provided [[schema]]
Expand All @@ -88,7 +89,7 @@ object HoodieUnsafeUtils {
* @param schema target [[DataFrame]]'s schema
*/
def createDataFrameFromInternalRows(spark: SparkSession, rows: Seq[InternalRow], schema: StructType): DataFrame =
Dataset.ofRows(spark, LocalRelation(schema.toAttributes, rows))
Dataset.ofRows(spark, LocalRelation(SparkAdapterSupport.sparkAdapter.getSchemaUtils.toAttributes(schema), rows))


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package org.apache.spark.sql.execution.datasources

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.sql.catalyst.InternalRow

/**
* Utils on Spark [[PartitionedFile]] to adapt to type changes.
* Utils on Spark [[PartitionedFile]] and [[PartitionDirectory]] to adapt to type changes.
* Before Spark 3.4.0,
* ```
* case class PartitionedFile(
Expand Down Expand Up @@ -65,13 +65,23 @@ trait HoodieSparkPartitionedFileUtils extends Serializable {
* Creates a new [[PartitionedFile]] instance.
*
* @param partitionValues value of partition columns to be prepended to each row.
* @param filePath URI of the file to read.
* @param start the beginning offset (in bytes) of the block.
* @param length number of bytes to read.
* @param filePath URI of the file to read.
* @param start the beginning offset (in bytes) of the block.
* @param length number of bytes to read.
* @return a new [[PartitionedFile]] instance.
*/
def createPartitionedFile(partitionValues: InternalRow,
filePath: Path,
start: Long,
length: Long): PartitionedFile

/**
* SPARK-43039 FileIndex#PartitionDirectory refactored in Spark 3.5.0
*/
def toFileStatuses(partitionDirs: Seq[PartitionDirectory]): Seq[FileStatus]

/**
* SPARK-43039 FileIndex#PartitionDirectory refactored in Spark 3.5.0
*/
def newPartitionDirectory(internalRow: InternalRow, statuses: Seq[FileStatus]): PartitionDirectory
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
package org.apache.spark.sql.hudi

import org.apache.avro.Schema
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql._
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public void testRemoveFields() {
// partitioned table test.
String schemaStr = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"}]},";
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"}]}";
Schema expectedSchema = new Schema.Parser().parse(schemaStr);
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
rec.put("_row_key", "key1");
Expand All @@ -318,7 +318,7 @@ public void testRemoveFields() {
schemaStr = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ "{\"name\": \"pii_col\", \"type\": \"string\"}]},";
+ "{\"name\": \"pii_col\", \"type\": \"string\"}]}";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Does it fail the test before with the comma at the end of the schema String?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it didn't fail before. In Avro 1.11.2 they enforce a stricter schema format

expectedSchema = new Schema.Parser().parse(schemaStr);
rec1 = HoodieAvroUtils.removeFields(rec, Collections.singleton(""));
assertEquals(expectedSchema, rec1.getSchema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.IOException;
Expand Down Expand Up @@ -107,6 +108,7 @@ public void testClusteringPlanMultipleInstants() throws Exception {

// replacecommit.inflight doesn't have clustering plan.
// Verify that getClusteringPlan fetches content from corresponding requested file.
@Disabled("Will fail due to avro issue AVRO-3789. This is fixed in avro 1.11.3")
@Test
public void testClusteringPlanInflight() throws Exception {
String partitionPath1 = "partition1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.hudi.integ.testsuite.dag.nodes;

import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand All @@ -40,22 +41,16 @@
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;

import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;

import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;

Expand Down Expand Up @@ -244,10 +239,6 @@ private Dataset<Row> getInputDf(ExecutionContext context, SparkSession session,
}

private ExpressionEncoder getEncoder(StructType schema) {
List<Attribute> attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream()
.map(Attribute::toAttribute).collect(Collectors.toList());
return RowEncoder.apply(schema)
.resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(),
SimpleAnalyzer$.MODULE$);
return SparkAdapterSupport$.MODULE$.sparkAdapter().getCatalystExpressionUtils().getEncoder(schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ import org.apache.spark.sql.{Row, SQLContext, SparkSession}

import java.net.URI
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

trait HoodieFileSplit {}
Expand Down Expand Up @@ -423,7 +422,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
}

val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
val fsView = new HoodieTableFileSystemView(
metaClient, timeline, sparkAdapter.getSparkPartitionedFileUtils.toFileStatuses(partitionDirs).toArray)

fsView.getPartitionPaths.asScala.flatMap { partitionPath =>
val relativePath = getRelativePartitionPath(basePath, partitionPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class HoodieCDCFileIndex (override val spark: SparkSession,
}}.toList
val partitionValues: InternalRow = new GenericInternalRow(doParsePartitionColumnValues(
metaClient.getTableConfig.getPartitionFields.get(), partitionPath).asInstanceOf[Array[Any]])
PartitionDirectory(
sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory(
new HoodiePartitionCDCFileGroupMapping(
partitionValues, fileGroups.map(kv => kv._1 -> kv._2.asScala.toList).toMap),
fileGroupIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,11 @@ case class HoodieFileIndex(spark: SparkSession,
|| (f.getBaseFile.isPresent && f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> f) }
if (c.nonEmpty) {
PartitionDirectory(new HoodiePartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), c), baseFileStatusesAndLogFileOnly)
sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory(
new HoodiePartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), c), baseFileStatusesAndLogFileOnly)
} else {
PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), baseFileStatusesAndLogFileOnly)
sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory(
InternalRow.fromSeq(partitionOpt.get.values), baseFileStatusesAndLogFileOnly)
}

} else {
Expand All @@ -184,7 +186,8 @@ case class HoodieFileIndex(spark: SparkSession,
baseFileStatusOpt.foreach(f => files.append(f))
files
})
PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), allCandidateFiles)
sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory(
InternalRow.fromSeq(partitionOpt.get.values), allCandidateFiles)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ class HoodieIncrementalFileIndex(override val spark: SparkSession,
|| (f.getBaseFile.isPresent && f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> f) }
if (c.nonEmpty) {
PartitionDirectory(new HoodiePartitionFileSliceMapping(partitionValues, c), baseFileStatusesAndLogFileOnly)
sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory(
new HoodiePartitionFileSliceMapping(partitionValues, c), baseFileStatusesAndLogFileOnly)
} else {
PartitionDirectory(partitionValues, baseFileStatusesAndLogFileOnly)
sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory(
partitionValues, baseFileStatusesAndLogFileOnly)
}
} else {
val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
Expand All @@ -83,7 +85,8 @@ class HoodieIncrementalFileIndex(override val spark: SparkSession,
baseFileStatusOpt.foreach(f => files.append(f))
files
})
PartitionDirectory(partitionValues, allCandidateFiles)
sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory(
partitionValues, allCandidateFiles)
}
}.toSeq

Expand Down
Loading