Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ message Relation {
Unpivot unpivot = 25;
ToSchema to_schema = 26;
RepartitionByExpression repartition_by_expression = 27;
FrameMap frame_map = 28;
MapPartitions map_partitions = 28;
CollectMetrics collect_metrics = 29;
Parse parse = 30;

Expand Down Expand Up @@ -780,11 +780,11 @@ message RepartitionByExpression {
optional int32 num_partitions = 3;
}

message FrameMap {
// (Required) Input relation for a Frame Map API: mapInPandas, mapInArrow.
message MapPartitions {
// (Required) Input relation for a mapPartitions-equivalent API: mapInPandas, mapInArrow.
Relation input = 1;

// (Required) Input user-defined function of a Frame Map API.
// (Required) Input user-defined function.
CommonInlineUserDefinedFunction func = 2;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ class SparkConnectPlanner(val session: SparkSession) {
case proto.Relation.RelTypeCase.UNPIVOT => transformUnpivot(rel.getUnpivot)
case proto.Relation.RelTypeCase.REPARTITION_BY_EXPRESSION =>
transformRepartitionByExpression(rel.getRepartitionByExpression)
case proto.Relation.RelTypeCase.FRAME_MAP =>
transformFrameMap(rel.getFrameMap)
case proto.Relation.RelTypeCase.MAP_PARTITIONS =>
transformMapPartitions(rel.getMapPartitions)
case proto.Relation.RelTypeCase.COLLECT_METRICS =>
transformCollectMetrics(rel.getCollectMetrics)
case proto.Relation.RelTypeCase.PARSE => transformParse(rel.getParse)
Expand Down Expand Up @@ -471,7 +471,7 @@ class SparkConnectPlanner(val session: SparkSession) {
.logicalPlan
}

private def transformFrameMap(rel: proto.FrameMap): LogicalPlan = {
private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
val commonUdf = rel.getFunc
val pythonUdf = transformPythonUDF(commonUdf)
pythonUdf.evalType match {
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1585,7 +1585,7 @@ def mapInPandas(
)

return DataFrame.withPlan(
plan.FrameMap(child=self._plan, function=udf_obj, cols=self.columns),
plan.MapPartitions(child=self._plan, function=udf_obj, cols=self.columns),
session=self._session,
)

Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/sql/connect/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1901,8 +1901,8 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation:
return proto.Relation(catalog=proto.Catalog(list_catalogs=proto.ListCatalogs()))


class FrameMap(LogicalPlan):
"""Logical plan object for a Frame Map API: mapInPandas, mapInArrow."""
class MapPartitions(LogicalPlan):
"""Logical plan object for a mapPartitions-equivalent API: mapInPandas, mapInArrow."""

def __init__(
self, child: Optional["LogicalPlan"], function: "UserDefinedFunction", cols: List[str]
Expand All @@ -1914,8 +1914,8 @@ def __init__(
def plan(self, session: "SparkConnectClient") -> proto.Relation:
assert self._child is not None
plan = self._create_proto_relation()
plan.frame_map.input.CopyFrom(self._child.plan(session))
plan.frame_map.func.CopyFrom(self._func.to_plan_udf(session))
plan.map_partitions.input.CopyFrom(self._child.plan(session))
plan.map_partitions.func.CopyFrom(self._func.to_plan_udf(session))
return plan


Expand Down
240 changes: 120 additions & 120 deletions python/pyspark/sql/connect/proto/relations_pb2.py

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions python/pyspark/sql/connect/proto/relations_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class Relation(google.protobuf.message.Message):
UNPIVOT_FIELD_NUMBER: builtins.int
TO_SCHEMA_FIELD_NUMBER: builtins.int
REPARTITION_BY_EXPRESSION_FIELD_NUMBER: builtins.int
FRAME_MAP_FIELD_NUMBER: builtins.int
MAP_PARTITIONS_FIELD_NUMBER: builtins.int
COLLECT_METRICS_FIELD_NUMBER: builtins.int
PARSE_FIELD_NUMBER: builtins.int
FILL_NA_FIELD_NUMBER: builtins.int
Expand Down Expand Up @@ -161,7 +161,7 @@ class Relation(google.protobuf.message.Message):
@property
def repartition_by_expression(self) -> global___RepartitionByExpression: ...
@property
def frame_map(self) -> global___FrameMap: ...
def map_partitions(self) -> global___MapPartitions: ...
@property
def collect_metrics(self) -> global___CollectMetrics: ...
@property
Expand Down Expand Up @@ -230,7 +230,7 @@ class Relation(google.protobuf.message.Message):
unpivot: global___Unpivot | None = ...,
to_schema: global___ToSchema | None = ...,
repartition_by_expression: global___RepartitionByExpression | None = ...,
frame_map: global___FrameMap | None = ...,
map_partitions: global___MapPartitions | None = ...,
collect_metrics: global___CollectMetrics | None = ...,
parse: global___Parse | None = ...,
fill_na: global___NAFill | None = ...,
Expand Down Expand Up @@ -281,8 +281,6 @@ class Relation(google.protobuf.message.Message):
b"fill_na",
"filter",
b"filter",
"frame_map",
b"frame_map",
"freq_items",
b"freq_items",
"hint",
Expand All @@ -293,6 +291,8 @@ class Relation(google.protobuf.message.Message):
b"limit",
"local_relation",
b"local_relation",
"map_partitions",
b"map_partitions",
"offset",
b"offset",
"parse",
Expand Down Expand Up @@ -376,8 +376,6 @@ class Relation(google.protobuf.message.Message):
b"fill_na",
"filter",
b"filter",
"frame_map",
b"frame_map",
"freq_items",
b"freq_items",
"hint",
Expand All @@ -388,6 +386,8 @@ class Relation(google.protobuf.message.Message):
b"limit",
"local_relation",
b"local_relation",
"map_partitions",
b"map_partitions",
"offset",
b"offset",
"parse",
Expand Down Expand Up @@ -467,7 +467,7 @@ class Relation(google.protobuf.message.Message):
"unpivot",
"to_schema",
"repartition_by_expression",
"frame_map",
"map_partitions",
"collect_metrics",
"parse",
"fill_na",
Expand Down Expand Up @@ -2706,17 +2706,17 @@ class RepartitionByExpression(google.protobuf.message.Message):

global___RepartitionByExpression = RepartitionByExpression

class FrameMap(google.protobuf.message.Message):
class MapPartitions(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

INPUT_FIELD_NUMBER: builtins.int
FUNC_FIELD_NUMBER: builtins.int
@property
def input(self) -> global___Relation:
"""(Required) Input relation for a Frame Map API: mapInPandas, mapInArrow."""
"""(Required) Input relation for a mapPartitions-equivalent API: mapInPandas, mapInArrow."""
@property
def func(self) -> pyspark.sql.connect.proto.expressions_pb2.CommonInlineUserDefinedFunction:
"""(Required) Input user-defined function of a Frame Map API."""
"""(Required) Input user-defined function."""
def __init__(
self,
*,
Expand All @@ -2731,7 +2731,7 @@ class FrameMap(google.protobuf.message.Message):
self, field_name: typing_extensions.Literal["func", b"func", "input", b"input"]
) -> None: ...

global___FrameMap = FrameMap
global___MapPartitions = MapPartitions

class CollectMetrics(google.protobuf.message.Message):
"""Collect arbitrary (named) metrics from a dataset."""
Expand Down