Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this generated by something else? Should better have license header.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon this is a binary file generated by the below commands(Protoc), though the .proto file has comments Protoc seems to strip down the comments. we can't manually edit the .desc file also. not sure what else options we have.

// protoc --java_out=connector/protobuf/src/test/resources/protobuf/ connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
// protoc --descriptor_set_out=connector/protobuf/src/test/resources/protobuf/pyspark_test.desc --java_out=connector/protobuf/src/test/resources/protobuf/org/apache/spark/sql/protobuf/ connector/protobuf/src/test/resources/protobuf/pyspark_test.proto

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be part of build for tests, similar to how we generate compiled classes during build (same for Scala tests too). We can do that as a follow up with TODO.

�
Aconnector/protobuf/src/test/resources/protobuf/pyspark_test.protoorg.apache.spark.sql.protobuf"K
SimpleMessage
age (Rage
name ( Rname
score (RscoreBBSimpleMessageProtosbproto3
Expand Down
31 changes: 31 additions & 0 deletions connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// To compile and create test class:
// protoc --java_out=connector/protobuf/src/test/resources/protobuf/ connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
// protoc --descriptor_set_out=connector/protobuf/src/test/resources/protobuf/pyspark_test.desc --java_out=connector/protobuf/src/test/resources/protobuf/org/apache/spark/sql/protobuf/ connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, do we need --include-imports arg?
I think we should update our proto files to do a few imports rather than making each file self sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi we could use --include-imports and get rid of the first line of protoc command. I will make this change part of SPARK-40777

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, sg. Also intentionally add some imports to test proto files and use those messages.


syntax = "proto3";

package org.apache.spark.sql.protobuf;
option java_outer_classname = "SimpleMessageProtos";


message SimpleMessage {
int32 age = 1;
string name = 2;
int64 score = 3;
}
14 changes: 13 additions & 1 deletion dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,17 @@ def __hash__(self):
],
)

protobuf = Module(
name="protobuf",
dependencies=[sql],
source_file_regexes=[
"connector/protobuf",
],
sbt_test_goals=[
"protobuf/test",
],
)

sketch = Module(
name="sketch",
dependencies=[tags],
Expand Down Expand Up @@ -423,7 +434,7 @@ def __hash__(self):

pyspark_sql = Module(
name="pyspark-sql",
dependencies=[pyspark_core, hive, avro],
dependencies=[pyspark_core, hive, avro, protobuf],
source_file_regexes=["python/pyspark/sql"],
python_test_goals=[
# doctests
Expand All @@ -443,6 +454,7 @@ def __hash__(self):
"pyspark.sql.udf",
"pyspark.sql.window",
"pyspark.sql.avro.functions",
"pyspark.sql.protobuf.functions",
"pyspark.sql.pandas.conversion",
"pyspark.sql.pandas.map_ops",
"pyspark.sql.pandas.group_ops",
Expand Down
16 changes: 8 additions & 8 deletions dev/sparktestsupport/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,23 +108,23 @@ def determine_modules_to_test(changed_modules, deduplicated=True):
['graphx', 'examples']
>>> [x.name for x in determine_modules_to_test([modules.sql])]
... # doctest: +NORMALIZE_WHITESPACE
['sql', 'avro', 'connect', 'docker-integration-tests', 'hive', 'mllib', 'sql-kafka-0-10',
'examples', 'hive-thriftserver', 'pyspark-sql', 'repl', 'sparkr', 'pyspark-connect',
'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-ml']
['sql', 'avro', 'connect', 'docker-integration-tests', 'hive', 'mllib', 'protobuf',
'sql-kafka-0-10', 'examples', 'hive-thriftserver', 'pyspark-sql', 'repl', 'sparkr',
'pyspark-connect', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-ml']
>>> sorted([x.name for x in determine_modules_to_test(
... [modules.sparkr, modules.sql], deduplicated=False)])
... # doctest: +NORMALIZE_WHITESPACE
['avro', 'connect', 'docker-integration-tests', 'examples', 'hive', 'hive-thriftserver',
'mllib', 'pyspark-connect', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas',
'mllib', 'protobuf', 'pyspark-connect', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas',
'pyspark-pandas-slow', 'pyspark-sql', 'repl', 'sparkr', 'sql', 'sql-kafka-0-10']
>>> sorted([x.name for x in determine_modules_to_test(
... [modules.sql, modules.core], deduplicated=False)])
... # doctest: +NORMALIZE_WHITESPACE
['avro', 'catalyst', 'connect', 'core', 'docker-integration-tests', 'examples', 'graphx',
'hive', 'hive-thriftserver', 'mllib', 'mllib-local', 'pyspark-connect', 'pyspark-core',
'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-resource',
'pyspark-sql', 'pyspark-streaming', 'repl', 'root', 'sparkr', 'sql', 'sql-kafka-0-10',
'streaming', 'streaming-kafka-0-10', 'streaming-kinesis-asl']
'hive', 'hive-thriftserver', 'mllib', 'mllib-local', 'protobuf', 'pyspark-connect',
'pyspark-core', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow',
'pyspark-resource', 'pyspark-sql', 'pyspark-streaming', 'repl', 'root', 'sparkr', 'sql',
'sql-kafka-0-10', 'streaming', 'streaming-kafka-0-10', 'streaming-kinesis-asl']
"""
modules_to_test = set()
for module in changed_modules:
Expand Down
6 changes: 3 additions & 3 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object BuildCommons {
) = Seq(
"core", "graphx", "mllib", "mllib-local", "repl", "network-common", "network-shuffle", "launcher", "unsafe",
"tags", "sketch", "kvstore"
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ Seq(connect) ++ Seq(protobuf)
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ Seq(connect)

val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn,
sparkGangliaLgpl, streamingKinesisAsl,
Expand Down Expand Up @@ -433,7 +433,7 @@ object SparkBuild extends PomBuild {

enable(SparkConnect.settings)(connect)

/* Connector/proto settings */
/* connector/protobuf settings */
enable(SparkProtobuf.settings)(protobuf)

// SPARK-14738 - Remove docker tests from main Spark build
Expand Down Expand Up @@ -1292,7 +1292,7 @@ object CopyDependencies {
Files.copy(fid.toPath, destJar.toPath)
} else if (jar.getName.contains("spark-protobuf") &&
!SbtPomKeys.profiles.value.contains("noshade-protobuf")) {
Files.copy(fid.toPath, destJar.toPath)
Files.copy(fidProtobuf.toPath, destJar.toPath)
} else {
Files.copy(jar.toPath(), destJar.toPath())
}
Expand Down
18 changes: 18 additions & 0 deletions python/pyspark/sql/protobuf/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

__all__ = ["functions"]
189 changes: 189 additions & 0 deletions python/pyspark/sql/protobuf/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
A collections of builtin protobuf functions
"""


from typing import Dict, Optional, TYPE_CHECKING
from pyspark import SparkContext
from pyspark.sql.column import Column, _to_java_column
from pyspark.util import _print_missing_jar

if TYPE_CHECKING:
from pyspark.sql._typing import ColumnOrName


def from_protobuf(
data: "ColumnOrName",
descFilePath: str,
messageName: str,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we we change the order of the arguments (might need change to scala side as well). I.e. messageName as the second arg. There will be multiple variations of this method, all of which will need messageName. It can be second arg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi can we make this change part of SPARK-40657?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I will do that as part of that.

options: Optional[Dict[str, str]] = None,
) -> Column:
"""
Converts a binary column of Protobuf format into its corresponding catalyst value.
The specified schema must match the read data, otherwise the behavior is undefined:
it may fail or return arbitrary result.
To deserialize the data with a compatible and evolved schema, the expected
Protobuf schema can be set via the option protobuf descriptor.
.. versionadded:: 3.4.0
Parameters
----------
data : :class:`~pyspark.sql.Column` or str
the binary column.
descFilePath : str
the protobuf descriptor in Message GeneratedMessageV3 format.
messageName: str
the protobuf message name to look for in descriptorFile.
options : dict, optional
options to control how the Protobuf record is parsed.
Notes
-----
Protobuf is built-in but external data source module since Spark 2.4. Please deploy the
application as per the deployment section of "Protobuf Data Source Guide".
Examples
--------
>>> from pyspark.sql import Row
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example is more complex because it uses both to_protobuf & from_protobuf().
Majority of the use case would be for from_protobuf(). Is it worth simplifying with just from_protobuf() demo? We could have some protobuf records in a file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi to make from_protobuf() function? We require a protobuf message. I'm simply converting a dataframe to a protobuf message with to_protobuf. Otherwise, we need to write a protobuf dynamic message in Python that includes the protobuf python lib installation. not sure which is the simplest one.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just initializing bytes?
In fact, it does not need to be a working example. It could even read from kafka. With current example, simplicity of using this function to process protobufs is lost.

Most users of this function may not much about protobufs at all. All they would know is that their input needs this function.
But this optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi functions.py is not showing examples but is executing each line and trying to assert with exact values, its doctests(unit tests for pyspark). if we put an example that does not work, the build will fail. functions.py is part of python tests. we can add examples like just from_protobuf from bytes and reading from Kafka in SPARK-40776 right?

    >>> protobufDf.show(truncate=False)
    +----------------------------------------+
    |protobuf                                |
    +----------------------------------------+
    |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]|
    +----------------------------------------+

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, agree. Examples as suggested in SPARK-40776 sounds great. Most users will read that.
We can leave it as it is.

>>> from pyspark.sql.types import *
>>> from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf
>>> data = ([Row(key="1", value=Row(age=2, name="Alice", score=109200))])
>>> schema = StructType([StructField("key", StringType(), False), \
StructField( "value", StructType([ StructField("age", IntegerType(), False), \
StructField("name", StringType(), False), StructField("score", LongType(), False), ]), False)])
>>> df = spark.createDataFrame(data, schema)
>>> descFilePath = 'connector/protobuf/src/test/resources/protobuf/pyspark_test.desc'
>>> messageName = 'SimpleMessage'
>>> protobufDf = df.select(to_protobuf(df.value, descFilePath, messageName).alias("protobuf"))
>>> protobufDf.collect()
[Row(protobuf=bytearray(b'\\x08\\x02\\x12\\x05Alice\\x18\\x90\\xd5\\x06'))]
>>> descFilePath = 'connector/protobuf/src/test/resources/protobuf/pyspark_test.desc'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can emboded this into the doctests so users can copy and paste to test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon since from_protobuf and to_protobuf only take file path as a schema param, users have to pass pyspark_test.desc file. we have created multiple JIRAs on "Protobuf Support" EPIC to support multiple ways for passing protobuf schema.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

>>> messageName = 'SimpleMessage'
>>> df = protobufDf.select(from_protobuf(protobufDf.protobuf, \
descFilePath, messageName).alias("value"))
>>> df.collect()
[Row(value=Row(age=2, name='Alice', score=109200))]
"""

sc = SparkContext._active_spark_context
assert sc is not None and sc._jvm is not None
try:
jc = sc._jvm.org.apache.spark.sql.protobuf.functions.from_protobuf(
_to_java_column(data), descFilePath, messageName, options or {}
)
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
_print_missing_jar("Protobuf", "protobuf", "protobuf", sc.version)
raise
return Column(jc)


def to_protobuf(data: "ColumnOrName", descFilePath: str, messageName: str) -> Column:
"""
Converts a column into binary of protobuf format.
.. versionadded:: 3.4.0
Parameters
----------
data : :class:`~pyspark.sql.Column` or str
the data column.
descFilePath : str
the protobuf descriptor in Message GeneratedMessageV3 format.
messageName: str
the protobuf message name to look for in descriptorFile.
Notes
-----
Protobuf is built-in but external data source module since Spark 2.4. Please deploy the
application as per the deployment section of "Protobuf Data Source Guide".
Examples
--------
>>> from pyspark.sql import Row
>>> from pyspark.sql.protobuf.functions import to_protobuf
>>> from pyspark.sql.types import *
>>> descFilePath = 'connector/protobuf/src/test/resources/protobuf/pyspark_test.desc'
>>> messageName = 'SimpleMessage'
>>> data = ([Row(value=Row(age=2, name="Alice", score=13093020))])
>>> schema = StructType([StructField( "value", \
StructType([ StructField("age", IntegerType(), False), \
StructField("name", StringType(), False), \
StructField("score", LongType(), False), ]), False)])
>>> df = spark.createDataFrame(data, schema)
>>> df.select(to_protobuf(df.value, descFilePath, messageName).alias("suite")).collect()
[Row(suite=bytearray(b'\\x08\\x02\\x12\\x05Alice\\x18\\x9c\\x91\\x9f\\x06'))]
"""
sc = SparkContext._active_spark_context
assert sc is not None and sc._jvm is not None
try:
jc = sc._jvm.org.apache.spark.sql.protobuf.functions.to_protobuf(
_to_java_column(data), descFilePath, messageName
)
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
_print_missing_jar("Protobuf", "protobuf", "protobuf", sc.version)
raise
return Column(jc)


def _test() -> None:
import os
import sys
from pyspark.testing.utils import search_jar

protobuf_jar = search_jar("connector/protobuf", "spark-protobuf-assembly-", "spark-protobuf")
if protobuf_jar is None:
print(
"Skipping all Protobuf Python tests as the optional Protobuf project was "
"not compiled into a JAR. To run these tests, "
"you need to build Spark with 'build/sbt -Pprotobuf package' or "
"'build/mvn -Pprotobuf package' before running this test."
)
sys.exit(0)
else:
existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
jars_args = "--jars %s" % protobuf_jar
os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])

import doctest
from pyspark.sql import SparkSession
import pyspark.sql.protobuf.functions

globs = pyspark.sql.protobuf.functions.__dict__.copy()
spark = (
SparkSession.builder.master("local[*]")
.appName("sql.protobuf.functions tests")
.getOrCreate()
)
globs["spark"] = spark
(failure_count, test_count) = doctest.testmod(
pyspark.sql.protobuf.functions,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)


if __name__ == "__main__":
_test()