-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-40655][PYTHON][PROTOBUF] PySpark support for from_protobuf and to_protobuf #38212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@gengliangwang @rangadi here is the PR for pyspark functions for from_protobuf and to_protobuf |
|
you may also add it to also cc @HyukjinKwon |
a391f00 to
29e2f6b
Compare
29e2f6b to
d2c16f6
Compare
|
d2c16f6 to
b7ed8e8
Compare
| @@ -0,0 +1,7 @@ | |||
|
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this generated by something else? Should better have license header.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon this is a binary file generated by the below commands(Protoc), though the .proto file has comments Protoc seems to strip down the comments. we can't manually edit the .desc file also. not sure what else options we have.
// protoc --java_out=connector/protobuf/src/test/resources/protobuf/ connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
// protoc --descriptor_set_out=connector/protobuf/src/test/resources/protobuf/pyspark_test.desc --java_out=connector/protobuf/src/test/resources/protobuf/org/apache/spark/sql/protobuf/ connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be part of build for tests, similar to how we generate compiled classes during build (same for Scala tests too). We can do that as a follow up with TODO.
| >>> protobufDf.collect() | ||
| [Row(protobuf=bytearray(b'\\x08\\x02\\x12\\x05Alice\\x18\\x90\\xd5\\x06'))] | ||
| >>> descFilePath = 'connector/protobuf/src/test/resources/protobuf/pyspark_test.desc' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can emboded this into the doctests so users can copy and paste to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon since from_protobuf and to_protobuf only take file path as a schema param, users have to pass pyspark_test.desc file. we have created multiple JIRAs on "Protobuf Support" EPIC to support multiple ways for passing protobuf schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also fix .github/labeler.yml to add this file into Protobuf compoenent.
665747e to
77d2da0
Compare
|
@HyukjinKwon @gengliangwang can we merge the PR? |
| >>> # Writing a protobuf description into a file, generated by using | ||
| >>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file | ||
| >>> with tempfile.TemporaryDirectory() as tmp_dir: | ||
| ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the desc file included in this PR? If not should this have failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nm. You are using the bytes.
Will generate it at build time in a follow up.
connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
Outdated
Show resolved
Hide resolved
|
LGTM. Thanks for working on this, this is a nice feature. |
536dabe to
bbc1e68
Compare
|
Will merge once the tests pass. |
efc1e98 to
f5eafc6
Compare
f5eafc6 to
d2707fb
Compare
@HyukjinKwon All checks have passed. |
|
Merged to master. |
… to_protobuf From SandishKumarHN(sanysandishgmail.com) and Mohan Parthasarathy(mposdev21gmail.com) This PR follows main PR apache#37972 The following is an example of how to use from_protobuf and to_protobuf in Pyspark. ```python data = [("1", (2, "Alice", 109200))] ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>" df = spark.createDataFrame(data, ddl_schema) desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726' ... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61' ... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121' ... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363' ... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707' ... '26F746F33') import tempfile with tempfile.TemporaryDirectory() as tmp_dir: ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir ... with open(desc_file_path, "wb") as f: ... _ = f.write(bytearray.fromhex(desc_hex)) ... f.flush() ... message_name = 'SimpleMessage' ... proto_df = df.select(to_protobuf(df.value, ... desc_file_path, message_name).alias("value")) ... proto_df.show(truncate=False) ... proto_df = proto_df.select(from_protobuf(proto_df.value, ... desc_file_path, message_name).alias("value")) ... proto_df.show(truncate=False) +----------------------------------------+ |value | +----------------------------------------+ |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]| +----------------------------------------+ +------------------+ |value | +------------------+ |{2, Alice, 109200}| +------------------+ ``` - from_protobuf / to_protobuf (functions.py) Closes apache#38212 from SandishKumarHN/PYSPARK_PROTOBUF. Authored-by: SandishKumarHN <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…lasses This is the follow-up PR to #37972 and #38212 ### What changes were proposed in this pull request? 1. Move spark-protobuf error classes to the spark error-classes framework(core/src/main/resources/error/error-classes.json). 2. Support protobuf imports 3. validate protobuf timestamp and duration types. ### Why are the changes needed? N/A ### Does this PR introduce _any_ user-facing change? None ### How was this patch tested? Existing tests should cover the validation of this PR. CC: rangadi mposdev21 gengliangwang Closes #38344 from SandishKumarHN/SPARK-40777-ProtoErrorCls. Authored-by: SandishKumarHN <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
… to_protobuf From SandishKumarHN(sanysandishgmail.com) and Mohan Parthasarathy(mposdev21gmail.com) This PR follows main PR apache#37972 The following is an example of how to use from_protobuf and to_protobuf in Pyspark. ```python data = [("1", (2, "Alice", 109200))] ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>" df = spark.createDataFrame(data, ddl_schema) desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726' ... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61' ... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121' ... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363' ... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707' ... '26F746F33') import tempfile # Writing a protobuf description into a file, generated by using # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file with tempfile.TemporaryDirectory() as tmp_dir: ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir ... with open(desc_file_path, "wb") as f: ... _ = f.write(bytearray.fromhex(desc_hex)) ... f.flush() ... message_name = 'SimpleMessage' ... proto_df = df.select(to_protobuf(df.value, ... desc_file_path, message_name).alias("value")) ... proto_df.show(truncate=False) ... proto_df = proto_df.select(from_protobuf(proto_df.value, ... desc_file_path, message_name).alias("value")) ... proto_df.show(truncate=False) +----------------------------------------+ |value | +----------------------------------------+ |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]| +----------------------------------------+ +------------------+ |value | +------------------+ |{2, Alice, 109200}| +------------------+ ``` ### ****Tests Covered**** - from_protobuf / to_protobuf (functions.py) Closes apache#38212 from SandishKumarHN/PYSPARK_PROTOBUF. Authored-by: SandishKumarHN <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…lasses This is the follow-up PR to apache#37972 and apache#38212 ### What changes were proposed in this pull request? 1. Move spark-protobuf error classes to the spark error-classes framework(core/src/main/resources/error/error-classes.json). 2. Support protobuf imports 3. validate protobuf timestamp and duration types. ### Why are the changes needed? N/A ### Does this PR introduce _any_ user-facing change? None ### How was this patch tested? Existing tests should cover the validation of this PR. CC: rangadi mposdev21 gengliangwang Closes apache#38344 from SandishKumarHN/SPARK-40777-ProtoErrorCls. Authored-by: SandishKumarHN <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
From SandishKumarHN([email protected]) and Mohan Parthasarathy([email protected])
This PR follows main PR #37972
The following is an example of how to use from_protobuf and to_protobuf in Pyspark.
Tests Covered