From ad1c38b9595401a7a11d387634d4954c50caf0ea Mon Sep 17 00:00:00 2001 From: Martin Grund Date: Fri, 25 Nov 2022 18:46:18 +0100 Subject: [PATCH 1/4] [SPARK-41114] [CONNECT] [PYTHON] [FOLLOW-UP] Python Client support for local data --- python/pyspark/sql/connect/client.py | 4 +++ python/pyspark/sql/connect/plan.py | 34 ++++++++++++++++++- .../sql/tests/connect/test_connect_basic.py | 9 +++++ 3 files changed, 46 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py index b41df12c357c..e7f0d4d890fd 100644 --- a/python/pyspark/sql/connect/client.py +++ b/python/pyspark/sql/connect/client.py @@ -29,6 +29,7 @@ import pyspark.sql.connect.proto.base_pb2_grpc as grpc_lib import pyspark.sql.types from pyspark import cloudpickle +from pyspark.sql.connect import plan from pyspark.sql.connect.dataframe import DataFrame from pyspark.sql.connect.readwriter import DataFrameReader from pyspark.sql.connect.plan import SQL, Range @@ -328,6 +329,9 @@ def __init__(self, connectionString: str = "sc://localhost", userId: Optional[st # Create the reader self.read = DataFrameReader(self) + def createDataFrame(self, pdf: "pandas.DataFrame") -> "DataFrame": + return DataFrame.withPlan(plan.LocalRelation(pdf), self) + def register_udf( self, function: Any, return_type: Union[str, pyspark.sql.types.DataType] ) -> str: diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 853b1a6dc0e1..8deb8313503e 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -25,7 +25,8 @@ TYPE_CHECKING, Mapping, ) - +import pandas +import pyarrow as pa import pyspark.sql.connect.proto as proto from pyspark.sql.connect.column import ( Column, @@ -177,6 +178,37 @@ def _repr_html_(self) -> str: """ +class LocalRelation(LogicalPlan): + """Creates a LocalRelation plan object based on a Pandas DataFrame.""" + + def __init__(self, pdf: "pandas.DataFrame") -> None: + super().__init__(None) + self._pdf = pdf + + def plan(self, session: "RemoteSparkSession") -> proto.Relation: + assert self._pdf is not None + + sink = pa.BufferOutputStream() + table = pa.Table.from_pandas(self._pdf) + with pa.ipc.new_stream(sink, table.schema) as writer: + for b in table.to_batches(): + writer.write_batch(b) + + plan = proto.Relation() + plan.local_relation.data = sink.getvalue().to_pybytes() + return plan + + def print(self, indent: int = 0) -> str: + return f"{' ' * indent}\n" + + def _repr_html_(self) -> str: + return f""" + + """ + + class ShowString(LogicalPlan): def __init__( self, child: Optional["LogicalPlan"], numRows: int, truncate: int, vertical: bool diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index 845d6ead567e..7fef24dee562 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -130,6 +130,15 @@ def conv_udf(x) -> str: result = df.select(u(df.id)).toPandas() self.assertIsNotNone(result) + def test_with_local_data(self): + """SPARK-41114: Test creating a dataframe using local data""" + pdf = pandas.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}) + df = self.connect.createDataFrame(pdf) + rows = df.filter(df.a == lit(3)).collect() + self.assertTrue(len(rows) == 1) + self.assertEqual(rows[0][0], 3) + self.assertEqual(rows[0][1], "c") + def test_simple_explain_string(self): df = self.connect.read.table(self.tbl_name).limit(10) result = df.explain() From d223a8265de4fb28b44f82a067f1c1cfcfd58df5 Mon Sep 17 00:00:00 2001 From: Martin Grund Date: Sat, 26 Nov 2022 21:56:43 +0100 Subject: [PATCH 2/4] comments --- python/pyspark/sql/connect/session.py | 29 ++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index d1840a5b8b9a..3cd00e78a064 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -17,6 +17,7 @@ from threading import RLock from typing import Optional, Any, Union, Dict, cast, overload +import pandas as pd import pyspark.sql.types from pyspark.sql.connect.client import SparkConnectClient @@ -24,10 +25,10 @@ from pyspark.sql.connect.plan import SQL, Range from pyspark.sql.connect.readwriter import DataFrameReader from pyspark.sql.utils import to_str +from . import plan from ._typing import OptionalPrimitiveType - # TODO(SPARK-38912): This method can be dropped once support for Python 3.8 is dropped # In Python 3.9, the @property decorator has been made compatible with the # @classmethod decorator (https://docs.python.org/3.9/library/functions.html#classmethod) @@ -206,8 +207,30 @@ def __init__(self, connectionString: str, userId: Optional[str] = None): # Create the reader self.read = DataFrameReader(self) - def createDataFrame(self, pdf: "pandas.DataFrame") -> "DataFrame": - return DataFrame.withPlan(plan.LocalRelation(pdf), self) + def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame": + """ + Creates a :class:`DataFrame` from a :class:`pandas.DataFrame`. + + .. versionadded:: 3.4.0 + + + Parameters + ---------- + data : :class:`pandas.DataFrame` + + Returns + ------- + :class:`DataFrame` + + Examples + -------- + >>> import pandas + >>> pdf = pandas.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}) + >>> self.connect.createDataFrame(pdf).collect() + [Row(a=1, b='a'), Row(a=2, b='b'), Row(a=3, b='c')] + + """ + return DataFrame.withPlan(plan.LocalRelation(data), self) @property def client(self) -> "SparkConnectClient": From deeb0aa807e0d7fb4fcc1e3bf1d36a8d6d3d10ad Mon Sep 17 00:00:00 2001 From: Martin Grund Date: Sat, 26 Nov 2022 21:58:21 +0100 Subject: [PATCH 3/4] merge --- python/pyspark/sql/connect/plan.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 2bbb02c0c280..caaa8f717337 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -185,7 +185,7 @@ def __init__(self, pdf: "pandas.DataFrame") -> None: super().__init__(None) self._pdf = pdf - def plan(self, session: "RemoteSparkSession") -> proto.Relation: + def plan(self, session: "SparkConnectClient") -> proto.Relation: assert self._pdf is not None sink = pa.BufferOutputStream() @@ -202,7 +202,7 @@ def print(self, indent: int = 0) -> str: return f"{' ' * indent}\n" def _repr_html_(self) -> str: - return f""" + return """
  • LocalRelation
From 991d0698fa2259f3ea249c99681e02d4698e83bc Mon Sep 17 00:00:00 2001 From: Martin Grund Date: Sun, 27 Nov 2022 22:28:59 +0100 Subject: [PATCH 4/4] review comments --- .../spark/sql/connect/planner/SparkConnectPlanner.scala | 3 +++ python/pyspark/sql/connect/plan.py | 2 -- python/pyspark/sql/connect/session.py | 3 +++ python/pyspark/sql/tests/connect/test_connect_basic.py | 5 +++++ 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 9c4299d652fd..041ab5447d77 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -277,6 +277,9 @@ class SparkConnectPlanner(session: SparkSession) { val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator( Iterator(rel.getData.toByteArray), TaskContext.get()) + if (structType == null) { + throw InvalidPlanInput(s"Input data for LocalRelation does not produce a schema.") + } val attributes = structType.toAttributes val proj = UnsafeProjection.create(attributes, attributes) new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index caaa8f717337..c6eeea957aaa 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -186,8 +186,6 @@ def __init__(self, pdf: "pandas.DataFrame") -> None: self._pdf = pdf def plan(self, session: "SparkConnectClient") -> proto.Relation: - assert self._pdf is not None - sink = pa.BufferOutputStream() table = pa.Table.from_pandas(self._pdf) with pa.ipc.new_stream(sink, table.schema) as writer: diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index 3cd00e78a064..c9b76cf47f99 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -230,6 +230,9 @@ def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame": [Row(a=1, b='a'), Row(a=2, b='b'), Row(a=3, b='c')] """ + assert data is not None + if len(data) == 0: + raise ValueError("Input data cannot be empty") return DataFrame.withPlan(plan.LocalRelation(data), self) @property diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index d9ab95b0ccc6..21039ee59a04 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -140,6 +140,11 @@ def test_with_local_data(self): self.assertEqual(rows[0][0], 3) self.assertEqual(rows[0][1], "c") + # Check correct behavior for empty DataFrame + pdf = pandas.DataFrame({"a": []}) + with self.assertRaises(ValueError): + self.connect.createDataFrame(pdf) + def test_simple_explain_string(self): df = self.connect.read.table(self.tbl_name).limit(10) result = df.explain()