Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ jobs:
streaming, sql-kafka-0-10, streaming-kafka-0-10,
mllib-local, mllib,
yarn, mesos, kubernetes, hadoop-cloud, spark-ganglia-lgpl,
connect
connect, protobuf
# Here, we split Hive and SQL tests into some of slow ones and the rest of them.
included-tags: [""]
excluded-tags: [""]
Expand Down
32 changes: 32 additions & 0 deletions connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// TODO(SPARK-40777): Instead of saving .desc files in resources, generate during build.
// To compile and create test class:
// protoc --java_out=connector/protobuf/src/test/resources/protobuf/ connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
// protoc --descriptor_set_out=connector/protobuf/src/test/resources/protobuf/pyspark_test.desc --java_out=connector/protobuf/src/test/resources/protobuf/org/apache/spark/sql/protobuf/ connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, do we need --include-imports arg?
I think we should update our proto files to do a few imports rather than making each file self sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi we could use --include-imports and get rid of the first line of protoc command. I will make this change part of SPARK-40777

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, sg. Also intentionally add some imports to test proto files and use those messages.


syntax = "proto3";

package org.apache.spark.sql.protobuf;
option java_outer_classname = "SimpleMessageProtos";


message SimpleMessage {
int32 age = 1;
string name = 2;
int64 score = 3;
}
14 changes: 13 additions & 1 deletion dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,17 @@ def __hash__(self):
],
)

protobuf = Module(
name="protobuf",
dependencies=[sql],
source_file_regexes=[
"connector/protobuf",
],
sbt_test_goals=[
"protobuf/test",
],
)

sketch = Module(
name="sketch",
dependencies=[tags],
Expand Down Expand Up @@ -423,7 +434,7 @@ def __hash__(self):

pyspark_sql = Module(
name="pyspark-sql",
dependencies=[pyspark_core, hive, avro],
dependencies=[pyspark_core, hive, avro, protobuf],
source_file_regexes=["python/pyspark/sql"],
python_test_goals=[
# doctests
Expand All @@ -443,6 +454,7 @@ def __hash__(self):
"pyspark.sql.udf",
"pyspark.sql.window",
"pyspark.sql.avro.functions",
"pyspark.sql.protobuf.functions",
"pyspark.sql.pandas.conversion",
"pyspark.sql.pandas.map_ops",
"pyspark.sql.pandas.group_ops",
Expand Down
16 changes: 8 additions & 8 deletions dev/sparktestsupport/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,23 +108,23 @@ def determine_modules_to_test(changed_modules, deduplicated=True):
['graphx', 'examples']
>>> [x.name for x in determine_modules_to_test([modules.sql])]
... # doctest: +NORMALIZE_WHITESPACE
['sql', 'avro', 'connect', 'docker-integration-tests', 'hive', 'mllib', 'sql-kafka-0-10',
'examples', 'hive-thriftserver', 'pyspark-sql', 'repl', 'sparkr', 'pyspark-connect',
'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-ml']
['sql', 'avro', 'connect', 'docker-integration-tests', 'hive', 'mllib', 'protobuf',
'sql-kafka-0-10', 'examples', 'hive-thriftserver', 'pyspark-sql', 'repl', 'sparkr',
'pyspark-connect', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-ml']
>>> sorted([x.name for x in determine_modules_to_test(
... [modules.sparkr, modules.sql], deduplicated=False)])
... # doctest: +NORMALIZE_WHITESPACE
['avro', 'connect', 'docker-integration-tests', 'examples', 'hive', 'hive-thriftserver',
'mllib', 'pyspark-connect', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas',
'mllib', 'protobuf', 'pyspark-connect', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas',
'pyspark-pandas-slow', 'pyspark-sql', 'repl', 'sparkr', 'sql', 'sql-kafka-0-10']
>>> sorted([x.name for x in determine_modules_to_test(
... [modules.sql, modules.core], deduplicated=False)])
... # doctest: +NORMALIZE_WHITESPACE
['avro', 'catalyst', 'connect', 'core', 'docker-integration-tests', 'examples', 'graphx',
'hive', 'hive-thriftserver', 'mllib', 'mllib-local', 'pyspark-connect', 'pyspark-core',
'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-resource',
'pyspark-sql', 'pyspark-streaming', 'repl', 'root', 'sparkr', 'sql', 'sql-kafka-0-10',
'streaming', 'streaming-kafka-0-10', 'streaming-kinesis-asl']
'hive', 'hive-thriftserver', 'mllib', 'mllib-local', 'protobuf', 'pyspark-connect',
'pyspark-core', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow',
'pyspark-resource', 'pyspark-sql', 'pyspark-streaming', 'repl', 'root', 'sparkr', 'sql',
'sql-kafka-0-10', 'streaming', 'streaming-kafka-0-10', 'streaming-kinesis-asl']
"""
modules_to_test = set()
for module in changed_modules:
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.sql/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ This page gives an overview of all public Spark SQL API.
avro
observation
udf
protobuf
28 changes: 28 additions & 0 deletions python/docs/source/reference/pyspark.sql/protobuf.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
========
Protobuf
========
.. currentmodule:: pyspark.sql.protobuf.functions

.. autosummary::
:toctree: api/

from_protobuf
to_protobuf
18 changes: 18 additions & 0 deletions python/pyspark/sql/protobuf/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

__all__ = ["functions"]
215 changes: 215 additions & 0 deletions python/pyspark/sql/protobuf/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
A collections of builtin protobuf functions
"""


from typing import Dict, Optional, TYPE_CHECKING
from pyspark import SparkContext
from pyspark.sql.column import Column, _to_java_column
from pyspark.util import _print_missing_jar

if TYPE_CHECKING:
from pyspark.sql._typing import ColumnOrName


def from_protobuf(
data: "ColumnOrName",
descFilePath: str,
messageName: str,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we we change the order of the arguments (might need change to scala side as well). I.e. messageName as the second arg. There will be multiple variations of this method, all of which will need messageName. It can be second arg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rangadi can we make this change part of SPARK-40657?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I will do that as part of that.

options: Optional[Dict[str, str]] = None,
) -> Column:
"""
Converts a binary column of Protobuf format into its corresponding catalyst value.
The specified schema must match the read data, otherwise the behavior is undefined:
it may fail or return arbitrary result.
To deserialize the data with a compatible and evolved schema, the expected
Protobuf schema can be set via the option protobuf descriptor.

.. versionadded:: 3.4.0

Parameters
----------
data : :class:`~pyspark.sql.Column` or str
the binary column.
descFilePath : str
the protobuf descriptor in Message GeneratedMessageV3 format.
messageName: str
the protobuf message name to look for in descriptor file.
options : dict, optional
options to control how the protobuf record is parsed.

Notes
-----
Protobuf functionality is provided as an pluggable external module.

Examples
--------
>>> import tempfile
>>> data = [("1", (2, "Alice", 109200))]
>>> ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
... '26F746F33')
>>> # Writing a protobuf description into a file, generated by using
>>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file
>>> with tempfile.TemporaryDirectory() as tmp_dir:
... desc_file_path = "%s/pyspark_test.desc" % tmp_dir
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the desc file included in this PR? If not should this have failed?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nm. You are using the bytes.
Will generate it at build time in a follow up.

... with open(desc_file_path, "wb") as f:
... _ = f.write(bytearray.fromhex(desc_hex))
... f.flush()
... message_name = 'SimpleMessage'
... proto_df = df.select(
... to_protobuf(df.value, desc_file_path, message_name).alias("value"))
... proto_df.show(truncate=False)
... proto_df = proto_df.select(
... from_protobuf(proto_df.value, desc_file_path, message_name).alias("value"))
... proto_df.show(truncate=False)
+----------------------------------------+
|value |
+----------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 90 D5 06]|
+----------------------------------------+
+------------------+
|value |
+------------------+
|{2, Alice, 109200}|
+------------------+
"""

sc = SparkContext._active_spark_context
assert sc is not None and sc._jvm is not None
try:
jc = sc._jvm.org.apache.spark.sql.protobuf.functions.from_protobuf(
_to_java_column(data), descFilePath, messageName, options or {}
)
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
_print_missing_jar("Protobuf", "protobuf", "protobuf", sc.version)
raise
return Column(jc)


def to_protobuf(data: "ColumnOrName", descFilePath: str, messageName: str) -> Column:
"""
Converts a column into binary of protobuf format.

.. versionadded:: 3.4.0

Parameters
----------
data : :class:`~pyspark.sql.Column` or str
the data column.
descFilePath : str
the protobuf descriptor in Message GeneratedMessageV3 format.
messageName: str
the protobuf message name to look for in descriptor file.

Notes
-----
Protobuf functionality is provided as an pluggable external module

Examples
--------
>>> import tempfile
>>> data = [([(2, "Alice", 13093020)])]
>>> ddl_schema = "value struct<age: INTEGER, name: STRING, score: LONG>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
... '26F746F33')
>>> # Writing a protobuf description into a file, generated by using
>>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file
>>> with tempfile.TemporaryDirectory() as tmp_dir:
... desc_file_path = "%s/pyspark_test.desc" % tmp_dir
... with open(desc_file_path, "wb") as f:
... _ = f.write(bytearray.fromhex(desc_hex))
... f.flush()
... message_name = 'SimpleMessage'
... proto_df = df.select(
... to_protobuf(df.value, desc_file_path, message_name).alias("suite"))
... proto_df.show(truncate=False)
+-------------------------------------------+
|suite |
+-------------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]|
+-------------------------------------------+
"""
sc = SparkContext._active_spark_context
assert sc is not None and sc._jvm is not None
try:
jc = sc._jvm.org.apache.spark.sql.protobuf.functions.to_protobuf(
_to_java_column(data), descFilePath, messageName
)
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
_print_missing_jar("Protobuf", "protobuf", "protobuf", sc.version)
raise
return Column(jc)


def _test() -> None:
import os
import sys
from pyspark.testing.utils import search_jar

protobuf_jar = search_jar("connector/protobuf", "spark-protobuf-assembly-", "spark-protobuf")
if protobuf_jar is None:
print(
"Skipping all Protobuf Python tests as the optional Protobuf project was "
"not compiled into a JAR. To run these tests, "
"you need to build Spark with 'build/sbt package' or "
"'build/mvn package' before running this test."
)
sys.exit(0)
else:
existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
jars_args = "--jars %s" % protobuf_jar
os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])

import doctest
from pyspark.sql import SparkSession
import pyspark.sql.protobuf.functions

globs = pyspark.sql.protobuf.functions.__dict__.copy()
spark = (
SparkSession.builder.master("local[2]")
.appName("sql.protobuf.functions tests")
.getOrCreate()
)
globs["spark"] = spark
(failure_count, test_count) = doctest.testmod(
pyspark.sql.protobuf.functions,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)


if __name__ == "__main__":
_test()