diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 9c4299d652fd..041ab5447d77 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -277,6 +277,9 @@ class SparkConnectPlanner(session: SparkSession) { val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator( Iterator(rel.getData.toByteArray), TaskContext.get()) + if (structType == null) { + throw InvalidPlanInput(s"Input data for LocalRelation does not produce a schema.") + } val attributes = structType.toAttributes val proj = UnsafeProjection.create(attributes, attributes) new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 9a22d6ea38ec..c6eeea957aaa 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -25,7 +25,8 @@ TYPE_CHECKING, Mapping, ) - +import pandas +import pyarrow as pa import pyspark.sql.connect.proto as proto from pyspark.sql.connect.column import ( Column, @@ -177,6 +178,35 @@ def _repr_html_(self) -> str: """ +class LocalRelation(LogicalPlan): + """Creates a LocalRelation plan object based on a Pandas DataFrame.""" + + def __init__(self, pdf: "pandas.DataFrame") -> None: + super().__init__(None) + self._pdf = pdf + + def plan(self, session: "SparkConnectClient") -> proto.Relation: + sink = pa.BufferOutputStream() + table = pa.Table.from_pandas(self._pdf) + with pa.ipc.new_stream(sink, table.schema) as writer: + for b in table.to_batches(): + writer.write_batch(b) + + plan = proto.Relation() + plan.local_relation.data = sink.getvalue().to_pybytes() + return plan + + def print(self, indent: int = 0) -> str: + return f"{' ' * indent}\n" + + def _repr_html_(self) -> str: + return """ + + """ + + class ShowString(LogicalPlan): def __init__( self, child: Optional["LogicalPlan"], numRows: int, truncate: int, vertical: bool diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index 92f58140eacc..c9b76cf47f99 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -17,6 +17,7 @@ from threading import RLock from typing import Optional, Any, Union, Dict, cast, overload +import pandas as pd import pyspark.sql.types from pyspark.sql.connect.client import SparkConnectClient @@ -24,6 +25,7 @@ from pyspark.sql.connect.plan import SQL, Range from pyspark.sql.connect.readwriter import DataFrameReader from pyspark.sql.utils import to_str +from . import plan from ._typing import OptionalPrimitiveType @@ -205,6 +207,34 @@ def __init__(self, connectionString: str, userId: Optional[str] = None): # Create the reader self.read = DataFrameReader(self) + def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame": + """ + Creates a :class:`DataFrame` from a :class:`pandas.DataFrame`. + + .. versionadded:: 3.4.0 + + + Parameters + ---------- + data : :class:`pandas.DataFrame` + + Returns + ------- + :class:`DataFrame` + + Examples + -------- + >>> import pandas + >>> pdf = pandas.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}) + >>> self.connect.createDataFrame(pdf).collect() + [Row(a=1, b='a'), Row(a=2, b='b'), Row(a=3, b='c')] + + """ + assert data is not None + if len(data) == 0: + raise ValueError("Input data cannot be empty") + return DataFrame.withPlan(plan.LocalRelation(data), self) + @property def client(self) -> "SparkConnectClient": """ diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index 150bbdb65ef1..21039ee59a04 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -131,6 +131,20 @@ def conv_udf(x) -> str: result = df.select(u(df.id)).toPandas() self.assertIsNotNone(result) + def test_with_local_data(self): + """SPARK-41114: Test creating a dataframe using local data""" + pdf = pandas.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}) + df = self.connect.createDataFrame(pdf) + rows = df.filter(df.a == lit(3)).collect() + self.assertTrue(len(rows) == 1) + self.assertEqual(rows[0][0], 3) + self.assertEqual(rows[0][1], "c") + + # Check correct behavior for empty DataFrame + pdf = pandas.DataFrame({"a": []}) + with self.assertRaises(ValueError): + self.connect.createDataFrame(pdf) + def test_simple_explain_string(self): df = self.connect.read.table(self.tbl_name).limit(10) result = df.explain()