-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-40654][SQL] Protobuf support for Spark - from_protobuf AND to_protobuf #37972
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 40 commits
419dd78
870dbbf
077ac9e
d605bc5
1ea816a
ac94e14
d2710c6
c7fca1c
4c9bd74
7d60f9e
0435dd7
85884f7
0e0a4d5
5d57331
c8ac0d4
6aeb274
63d648d
d24f873
9185cc8
8a2f493
75a4e5f
c86c5d8
be82d92
849213e
adbfaf1
a418407
f7ca9e7
a65c131
e0c22d2
780d119
e22b763
e1fde67
1901cd4
121630c
e8a7c2b
6cfad63
4b824bd
2d2a822
7cdf9dd
a90800c
6f447e8
7aa678c
c6d45b7
58151dc
03d791c
d7c40ac
330d01e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| <?xml version="1.0" encoding="UTF-8"?> | ||
| <!-- | ||
| ~ Licensed to the Apache Software Foundation (ASF) under one or more | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am planning to work on Python support. @mposdev21 @SandishKumarHN let me know if you have already changes for it, otherwise, I can get started.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rangadi we already have the changes ready. should we create a separate PR now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, please go ahead with the PR if you can. We can start reviewing it even as this PR is being merged. |
||
| ~ contributor license agreements. See the NOTICE file distributed with | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about schema-registry support? I can look into that. Let me know @mposdev21, @SandishKumarHN
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, Raghu. Go ahead. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I will take that. |
||
| ~ this work for additional information regarding copyright ownership. | ||
| ~ The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| ~ (the "License"); you may not use this file except in compliance with | ||
| ~ the License. You may obtain a copy of the License at | ||
| ~ | ||
| ~ http://www.apache.org/licenses/LICENSE-2.0 | ||
| ~ | ||
| ~ Unless required by applicable law or agreed to in writing, software | ||
| ~ distributed under the License is distributed on an "AS IS" BASIS, | ||
| ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| ~ See the License for the specific language governing permissions and | ||
| ~ limitations under the License. | ||
| --> | ||
|
|
||
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
| <modelVersion>4.0.0</modelVersion> | ||
| <parent> | ||
| <groupId>org.apache.spark</groupId> | ||
| <artifactId>spark-parent_2.12</artifactId> | ||
| <version>3.4.0-SNAPSHOT</version> | ||
| <relativePath>../../pom.xml</relativePath> | ||
| </parent> | ||
|
|
||
| <artifactId>spark-protobuf_2.12</artifactId> | ||
| <properties> | ||
| <sbt.project.name>protobuf</sbt.project.name> | ||
| <protobuf.version>3.21.1</protobuf.version> | ||
| </properties> | ||
| <packaging>jar</packaging> | ||
| <name>Spark Protobuf</name> | ||
| <url>https://spark.apache.org/</url> | ||
|
|
||
| <dependencies> | ||
| <dependency> | ||
| <groupId>org.apache.spark</groupId> | ||
| <artifactId>spark-sql_${scala.binary.version}</artifactId> | ||
| <version>${project.version}</version> | ||
| <scope>provided</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.spark</groupId> | ||
| <artifactId>spark-core_${scala.binary.version}</artifactId> | ||
| <version>${project.version}</version> | ||
| <type>test-jar</type> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.spark</groupId> | ||
| <artifactId>spark-catalyst_${scala.binary.version}</artifactId> | ||
| <version>${project.version}</version> | ||
| <type>test-jar</type> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.spark</groupId> | ||
| <artifactId>spark-sql_${scala.binary.version}</artifactId> | ||
| <version>${project.version}</version> | ||
| <type>test-jar</type> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.scalacheck</groupId> | ||
| <artifactId>scalacheck_${scala.binary.version}</artifactId> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.spark</groupId> | ||
| <artifactId>spark-tags_${scala.binary.version}</artifactId> | ||
| </dependency> | ||
| <!-- #if scala-2.13 --><!-- | ||
| <dependency> | ||
| <groupId>org.scala-lang.modules</groupId> | ||
| <artifactId>scala-parallel-collections_${scala.binary.version}</artifactId> | ||
| </dependency> | ||
| --><!-- #endif scala-2.13 --> | ||
| <dependency> | ||
| <groupId>org.tukaani</groupId> | ||
|
||
| <artifactId>xz</artifactId> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>com.google.protobuf</groupId> | ||
| <artifactId>protobuf-java</artifactId> | ||
| <version>${protobuf.version}</version> | ||
| <scope>compile</scope> | ||
| </dependency> | ||
|
||
|
|
||
| </dependencies> | ||
| <build> | ||
| <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
| <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> | ||
| <plugins> | ||
| <plugin> | ||
| <groupId>org.apache.maven.plugins</groupId> | ||
| <artifactId>maven-shade-plugin</artifactId> | ||
| <configuration> | ||
| <shadedArtifactAttached>false</shadedArtifactAttached> | ||
| <artifactSet> | ||
| <includes> | ||
| <include>com.google.protobuf:*</include> | ||
| </includes> | ||
| </artifactSet> | ||
| <relocations> | ||
| <relocation> | ||
| <pattern>com.google.protobuf</pattern> | ||
| <shadedPattern>${spark.shade.packageName}.spark-proto.protobuf</shadedPattern> | ||
| <includes> | ||
| <include>com.google.protobuf.**</include> | ||
| </includes> | ||
| </relocation> | ||
| </relocations> | ||
| </configuration> | ||
| </plugin> | ||
| </plugins> | ||
| </build> | ||
| </project> | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.sql.protobuf | ||
|
|
||
| import com.google.protobuf.DynamicMessage | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} | ||
| import org.apache.spark.sql.protobuf.utils.ProtobufUtils | ||
| import org.apache.spark.sql.types.{BinaryType, DataType} | ||
|
|
||
| private[protobuf] case class CatalystDataToProtobuf( | ||
| child: Expression, | ||
| descFilePath: String, | ||
| messageName: String) | ||
| extends UnaryExpression { | ||
|
|
||
| override def dataType: DataType = BinaryType | ||
|
|
||
| @transient private lazy val protoType = | ||
| ProtobufUtils.buildDescriptor(descFilePath, messageName) | ||
|
|
||
| @transient private lazy val serializer = new ProtobufSerializer(child.dataType, protoType, | ||
| child.nullable) | ||
|
|
||
| override def nullSafeEval(input: Any): Any = { | ||
| val dynamicMessage = serializer.serialize(input).asInstanceOf[DynamicMessage] | ||
| dynamicMessage.toByteArray | ||
| } | ||
|
|
||
| override def prettyName: String = "to_protobuf" | ||
|
|
||
| override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
| val expr = ctx.addReferenceObj("this", this) | ||
| defineCodeGen(ctx, ev, input => | ||
| s"(byte[]) $expr.nullSafeEval($input)") | ||
| } | ||
|
|
||
| override protected def withNewChildInternal(newChild: Expression): CatalystDataToProtobuf = | ||
| copy(child = newChild) | ||
| } | ||
|
|
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,149 @@ | ||||||||||||||||||
| /* | ||||||||||||||||||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||||||||||||||||||
| * contributor license agreements. See the NOTICE file distributed with | ||||||||||||||||||
| * this work for additional information regarding copyright ownership. | ||||||||||||||||||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||||||||||||||
| * (the "License"); you may not use this file except in compliance with | ||||||||||||||||||
| * the License. You may obtain a copy of the License at | ||||||||||||||||||
| * | ||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||
| * | ||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||
| */ | ||||||||||||||||||
| package org.apache.spark.sql.protobuf | ||||||||||||||||||
|
|
||||||||||||||||||
| import scala.collection.JavaConverters._ | ||||||||||||||||||
| import scala.util.control.NonFatal | ||||||||||||||||||
|
|
||||||||||||||||||
| import com.google.protobuf.DynamicMessage | ||||||||||||||||||
|
|
||||||||||||||||||
| import org.apache.spark.SparkException | ||||||||||||||||||
| import org.apache.spark.sql.AnalysisException | ||||||||||||||||||
| import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, SpecificInternalRow, UnaryExpression} | ||||||||||||||||||
| import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} | ||||||||||||||||||
| import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode} | ||||||||||||||||||
| import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, ProtobufUtils, SchemaConverters} | ||||||||||||||||||
| import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, StructType} | ||||||||||||||||||
|
|
||||||||||||||||||
| private[protobuf] case class ProtobufDataToCatalyst(child: Expression, descFilePath: String, | ||||||||||||||||||
| messageName: String, | ||||||||||||||||||
| options: Map[String, String]) | ||||||||||||||||||
|
||||||||||||||||||
| private[protobuf] case class ProtobufDataToCatalyst(child: Expression, descFilePath: String, | |
| messageName: String, | |
| options: Map[String, String]) | |
| private[protobuf] case class ProtobufDataToCatalyst( | |
| child: Expression, | |
| descFilePath: String, | |
| messageName: String, | |
| options: Map[String, String]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's follow the scala style guide: https://github.com/databricks/scala-style-guide#spacing-and-indentation
There are many such codes in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, we will run the stylecheck and make sure it passes (Is dev/scalastyle a good way to verify ?)
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the important design decisions is about what to carry with the class: This implementation carries file path. Will it be available on all the executors? We could carry its content instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we are using --files to carry the descriptor file across all the executors. Could you elaborate on what you mean by carrying the content ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
E.g. we can initialize a byte array with the content of the file. We can rebuild the message from the buffer when the object is deserialized on the executors.
May not be required yet, let see.
Note that we will be adding more ways to provide the protobuf schema and we can update as part of that:
- Schema registry
- Java class in a jar (multiple customers have asked for this as well).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Having a separate PR to address this would be better.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The name expectedSchema seems to indicate more. There is no schema reconciliation as in Avro. We could call this simpler like messageDescriptor (or just descriptor).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, but when you start supporting schema registry, there will be a notion of expectedSchema. For now, we can fix it.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have this as a member initialized once. No need to pay the cost each time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a Jira open for this feature. We can open an EPIC. I am thinking of adding tasks under it.
cc: @mposdev21
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, there is no JIRA. Please go ahead and create the. EPIC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created an Epic with 5 tasks under it: https://issues.apache.org/jira/browse/SPARK-40653
I see that PR title is already updated :). Nice.