-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28218][SQL] Migrate Avro to File Data Source V2 #25017
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
This is the last migration for file source V2. It is a relatively simple one. Please help review it. |
|
Test build #107051 has finished for PR 25017 at commit
|
|
Thank you for pinging me, @gengliangwang . |
|
Retest this please. |
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
Outdated
Show resolved
Hide resolved
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #107144 has finished for PR 25017 at commit
|
|
Test build #107217 has finished for PR 25017 at commit
|
| job: Job, | ||
| options: Map[String, String], | ||
| dataSchema: StructType): OutputWriterFactory = { | ||
| val parsedOptions = new AvroOptions(options, job.getConfiguration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, this was the following (sharedState.sparkContext.hadoopConfiguration + SQLConf). Is job.getConfiguration enough for Avro?
val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf())There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is enough. Orc/Parquet also use the configuration from job.
external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
Outdated
Show resolved
Hide resolved
external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
Outdated
Show resolved
Hide resolved
| val parsedOptions = new AvroOptions(options, conf) | ||
| val userProvidedSchema = parsedOptions.schema.map(new Schema.Parser().parse) | ||
|
|
||
| if (parsedOptions.ignoreExtension || partitionedFile.filePath.endsWith(".avro")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we have the same comment above this line in order not to forget that?
// TODO Removes this check once `FileFormat` gets a general file filtering interface method.
// Doing input file filtering is improper because we may generate empty tasks that process no
// input files but stress the scheduler. We should probably add a more general input file
// filtering mechanism for `FileFormat` data sources. See SPARK-16317.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, there is an option pathGlobFilter for it. I have marked it as deprecated in #24518.
I think we can still support it in 3.0. So I am not sure what to comment here.
external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
Outdated
Show resolved
Hide resolved
external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWriteBuilder.scala
Outdated
Show resolved
Hide resolved
external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala
Outdated
Show resolved
Hide resolved
| paths: Seq[String], | ||
| userSpecifiedSchema: Option[StructType], | ||
| fallbackFileFormat: Class[_ <: FileFormat]) | ||
| extends FileTable(sparkSession, options, paths, userSpecifiedSchema) with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove with Logging and line 23.
external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
Outdated
Show resolved
Hide resolved
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few comments. Could you update the PR, @gengliangwang ?
external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
Outdated
Show resolved
Hide resolved
| import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions} | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.execution.datasources.PartitionedFile | ||
| import org.apache.spark.sql.execution.datasources.v2._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-import org.apache.spark.sql.execution.datasources.v2._
+import org.apache.spark.sql.execution.datasources.v2.{EmptyPartitionReader, FilePartitionReaderFactory, PartitionReaderWithPartitionValues}
external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
Outdated
Show resolved
Hide resolved
external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWriteBuilder.scala
Outdated
Show resolved
Hide resolved
|
@dongjoon-hyun I have updated the code. Thanks for reviewing this in your vacation! |
|
Test build #107263 has finished for PR 25017 at commit
|
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Merged to master.
Thank you, @gengliangwang !
What changes were proposed in this pull request?
Migrate Avro to File source V2.
How was this patch tested?
Unit test