Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ class SparkConnectPlanner(session: SparkSession) {
val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator(
Iterator(rel.getData.toByteArray),
TaskContext.get())
if (structType == null) {
throw InvalidPlanInput(s"Input data for LocalRelation does not produce a schema.")
}
val attributes = structType.toAttributes
val proj = UnsafeProjection.create(attributes, attributes)
new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
Expand Down
32 changes: 31 additions & 1 deletion python/pyspark/sql/connect/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
TYPE_CHECKING,
Mapping,
)

import pandas
import pyarrow as pa
import pyspark.sql.connect.proto as proto
from pyspark.sql.connect.column import (
Column,
Expand Down Expand Up @@ -177,6 +178,35 @@ def _repr_html_(self) -> str:
"""


class LocalRelation(LogicalPlan):
"""Creates a LocalRelation plan object based on a Pandas DataFrame."""

def __init__(self, pdf: "pandas.DataFrame") -> None:
super().__init__(None)
self._pdf = pdf

def plan(self, session: "SparkConnectClient") -> proto.Relation:
sink = pa.BufferOutputStream()
table = pa.Table.from_pandas(self._pdf)
with pa.ipc.new_stream(sink, table.schema) as writer:
Copy link
Contributor

@amaliujia amaliujia Nov 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar here so a question:

any possible that an empty panda dataframe are used here (e.g. has schema but no data). If so maybe have a test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a test for that, thanks for the proposal!

for b in table.to_batches():
writer.write_batch(b)

plan = proto.Relation()
plan.local_relation.data = sink.getvalue().to_pybytes()
return plan

def print(self, indent: int = 0) -> str:
return f"{' ' * indent}<LocalRelation>\n"

def _repr_html_(self) -> str:
return """
<ul>
<li>LocalRelation</li>
</ul>
"""


class ShowString(LogicalPlan):
def __init__(
self, child: Optional["LogicalPlan"], numRows: int, truncate: int, vertical: bool
Expand Down
30 changes: 30 additions & 0 deletions python/pyspark/sql/connect/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

from threading import RLock
from typing import Optional, Any, Union, Dict, cast, overload
import pandas as pd

import pyspark.sql.types
from pyspark.sql.connect.client import SparkConnectClient
from pyspark.sql.connect.dataframe import DataFrame
from pyspark.sql.connect.plan import SQL, Range
from pyspark.sql.connect.readwriter import DataFrameReader
from pyspark.sql.utils import to_str
from . import plan
from ._typing import OptionalPrimitiveType


Expand Down Expand Up @@ -205,6 +207,34 @@ def __init__(self, connectionString: str, userId: Optional[str] = None):
# Create the reader
self.read = DataFrameReader(self)

def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the implementation here isn't matched to what we have in createDataFrame(pandas).

By default, the Arrow message conversion (more specifically in https://github.com/apache/spark/pull/38659/files#diff-d630cc4be6c65a3c3f7d6dbfe990f99ba992ccc26d9c3aaf6cfe46e163cb7389R514-R521) have to happen in RDD so we can parallelize this.

For a bit of history, PySpark added the initial version with RDD first, and added this local relation as an optimization for small dataset (see also #36683) later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with the current approach but the main problem here is that 1. we can't stream the input, 2. it will have the size limit (likely 4KB). cc @hvanhovell FYI

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is impossible to match the implementation because in Pyspark to parallelize a first serialization is already happening to pass the input DF to the executors.

In our case to even send the data to spark we have to serialize it.

That said you're right that this currently does not support streaming of local data to the client. But the limit is not 4kb but probably whatever the max message size of GRPC is so in the megabytes.

I think we need to add the client side streaming APIs at some point but I'd like to defer that for a bit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a large pd.dataframe, I guess we can optimize it in this way in the future: split it into several batches, and create a localRelation for each batch, and finally Union them.

"""
Creates a :class:`DataFrame` from a :class:`pandas.DataFrame`.

.. versionadded:: 3.4.0


Parameters
----------
data : :class:`pandas.DataFrame`

Returns
-------
:class:`DataFrame`

Examples
--------
>>> import pandas
>>> pdf = pandas.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> self.connect.createDataFrame(pdf).collect()
[Row(a=1, b='a'), Row(a=2, b='b'), Row(a=3, b='c')]

"""
assert data is not None
if len(data) == 0:
raise ValueError("Input data cannot be empty")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, createDataFrame in pyspark doesnot support empty pandas dataframe. I think it would be fine to throw an error here.

return DataFrame.withPlan(plan.LocalRelation(data), self)

@property
def client(self) -> "SparkConnectClient":
"""
Expand Down
14 changes: 14 additions & 0 deletions python/pyspark/sql/tests/connect/test_connect_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,20 @@ def conv_udf(x) -> str:
result = df.select(u(df.id)).toPandas()
self.assertIsNotNone(result)

def test_with_local_data(self):
"""SPARK-41114: Test creating a dataframe using local data"""
pdf = pandas.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
df = self.connect.createDataFrame(pdf)
rows = df.filter(df.a == lit(3)).collect()
self.assertTrue(len(rows) == 1)
self.assertEqual(rows[0][0], 3)
self.assertEqual(rows[0][1], "c")

# Check correct behavior for empty DataFrame
pdf = pandas.DataFrame({"a": []})
with self.assertRaises(ValueError):
self.connect.createDataFrame(pdf)

def test_simple_explain_string(self):
df = self.connect.read.table(self.tbl_name).limit(10)
result = df.explain()
Expand Down