Skip to content

Commit 720a36a

Browse files
authored
Introduce spark.analyzed to force resolve InternalFrame. (#1862)
Introduces `spark.analyzed` to force resolve `InternalFrame`. After multiple operations, the underlying Spark plan could grow huge and make the Spark planner take a long time to finish the planning. This function is for the workaround to avoid it.
1 parent 2d0b2f2 commit 720a36a

File tree

2 files changed

+149
-19
lines changed

2 files changed

+149
-19
lines changed

databricks/koalas/spark/accessors.py

+105-1
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,59 @@ def apply(self, func) -> "ks.Series":
196196
# Lose index.
197197
return first_series(DataFrame(sdf)).rename(self._data.name)
198198

199+
@property
200+
def analyzed(self) -> "ks.Series":
201+
"""
202+
Returns a new Series with the analyzed Spark DataFrame.
203+
204+
After multiple operations, the underlying Spark plan could grow huge
205+
and make the Spark planner take a long time to finish the planning.
206+
207+
This function is for the workaround to avoid it.
208+
209+
.. note:: After analyzed, operations between the analyzed Series and the original one
210+
will **NOT** work without setting a config `compute.ops_on_diff_frames` to `True`.
211+
212+
Returns
213+
-------
214+
Series
215+
216+
Examples
217+
--------
218+
>>> ser = ks.Series([1, 2, 3])
219+
>>> ser
220+
0 1
221+
1 2
222+
2 3
223+
dtype: int64
224+
225+
The analyzed one should return the same value.
226+
227+
>>> ser.spark.analyzed
228+
0 1
229+
1 2
230+
2 3
231+
dtype: int64
232+
233+
However, it won't work with the same anchor Series.
234+
235+
>>> ser + ser.spark.analyzed
236+
Traceback (most recent call last):
237+
...
238+
ValueError: ... enable 'compute.ops_on_diff_frames' option.
239+
240+
>>> with ks.option_context('compute.ops_on_diff_frames', True):
241+
... (ser + ser.spark.analyzed).sort_index()
242+
0 2
243+
1 4
244+
2 6
245+
dtype: int64
246+
"""
247+
from databricks.koalas.frame import DataFrame
248+
from databricks.koalas.series import first_series
249+
250+
return first_series(DataFrame(self._data._internal.resolved_copy))
251+
199252

200253
class SparkIndexMethods(SparkIndexOpsMethods):
201254
def transform(self, func) -> "ks.Index":
@@ -817,7 +870,6 @@ def apply(self, func, index_col: Optional[Union[str, List[str]]] = None) -> "ks.
817870
818871
Examples
819872
--------
820-
>>> from databricks import koalas as ks
821873
>>> kdf = ks.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, columns=["a", "b"])
822874
>>> kdf
823875
a b
@@ -851,6 +903,58 @@ def apply(self, func, index_col: Optional[Union[str, List[str]]] = None) -> "ks.
851903
)
852904
return output.to_koalas(index_col)
853905

906+
@property
907+
def analyzed(self) -> "ks.DataFrame":
908+
"""
909+
Returns a new DataFrame with the analyzed Spark DataFrame.
910+
911+
After multiple operations, the underlying Spark plan could grow huge
912+
and make the Spark planner take a long time to finish the planning.
913+
914+
This function is for the workaround to avoid it.
915+
916+
.. note:: After analyzed, operations between the analyzed DataFrame and the original one
917+
will **NOT** work without setting a config `compute.ops_on_diff_frames` to `True`.
918+
919+
Returns
920+
-------
921+
DataFrame
922+
923+
Examples
924+
--------
925+
>>> df = ks.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, columns=["a", "b"])
926+
>>> df
927+
a b
928+
0 1 4
929+
1 2 5
930+
2 3 6
931+
932+
The analyzed one should return the same value.
933+
934+
>>> df.spark.analyzed
935+
a b
936+
0 1 4
937+
1 2 5
938+
2 3 6
939+
940+
However, it won't work with the same anchor Series.
941+
942+
>>> df + df.spark.analyzed
943+
Traceback (most recent call last):
944+
...
945+
ValueError: ... enable 'compute.ops_on_diff_frames' option.
946+
947+
>>> with ks.option_context('compute.ops_on_diff_frames', True):
948+
... (df + df.spark.analyzed).sort_index()
949+
a b
950+
0 2 8
951+
1 4 10
952+
2 6 12
953+
"""
954+
from databricks.koalas.frame import DataFrame
955+
956+
return DataFrame(self._kdf._internal.resolved_copy)
957+
854958

855959
class CachedSparkFrameMethods(SparkFrameMethods):
856960
"""Spark related features for cached DataFrame. This is usually created via

databricks/koalas/utils.py

+44-18
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ def combine_frames(this, *args, how="full", preserve_order_column=False):
8787
from databricks.koalas.frame import DataFrame
8888
from databricks.koalas.internal import (
8989
InternalFrame,
90+
HIDDEN_COLUMNS,
9091
NATURAL_ORDER_COLUMN_NAME,
9192
SPARK_INDEX_NAME_FORMAT,
9293
)
@@ -108,8 +109,37 @@ def combine_frames(this, *args, how="full", preserve_order_column=False):
108109
raise AssertionError("args should be single DataFrame or " "single/multiple Series")
109110

110111
if get_option("compute.ops_on_diff_frames"):
111-
this_index_map = this._internal.index_map
112-
that_index_map = that._internal.index_map
112+
113+
def resolve(internal, side):
114+
rename = lambda col: "__{}_{}".format(side, col)
115+
internal = internal.resolved_copy
116+
sdf = internal.spark_frame
117+
sdf = internal.spark_frame.select(
118+
[
119+
scol_for(sdf, col).alias(rename(col))
120+
for col in sdf.columns
121+
if col not in HIDDEN_COLUMNS
122+
]
123+
+ list(HIDDEN_COLUMNS)
124+
)
125+
return internal.copy(
126+
spark_frame=sdf,
127+
index_map=OrderedDict(
128+
zip(
129+
[rename(col) for col in internal.index_spark_column_names],
130+
internal.index_names,
131+
)
132+
),
133+
data_spark_columns=[
134+
scol_for(sdf, rename(col)) for col in internal.data_spark_column_names
135+
],
136+
)
137+
138+
this_internal = resolve(this._internal, "this")
139+
that_internal = resolve(that._internal, "that")
140+
141+
this_index_map = this_internal.index_map
142+
that_index_map = that_internal.index_map
113143
assert len(this_index_map) == len(that_index_map)
114144

115145
join_scols = []
@@ -119,8 +149,8 @@ def combine_frames(this, *args, how="full", preserve_order_column=False):
119149
# level.
120150
this_and_that_index_map = zip(this_index_map.items(), that_index_map.items())
121151

122-
this_sdf = this._internal.resolved_copy.spark_frame.alias("this")
123-
that_sdf = that._internal.resolved_copy.spark_frame.alias("that")
152+
this_sdf = this_internal.spark_frame.alias("this")
153+
that_sdf = that_internal.spark_frame.alias("that")
124154

125155
# If the same named index is found, that's used.
126156
index_column_names = []
@@ -155,16 +185,12 @@ def combine_frames(this, *args, how="full", preserve_order_column=False):
155185
joined_df = joined_df.select(
156186
merged_index_scols
157187
+ [
158-
scol_for(this_sdf, this._internal.spark_column_name_for(label)).alias(
159-
"__this_%s" % this._internal.spark_column_name_for(label)
160-
)
161-
for label in this._internal.column_labels
188+
scol_for(this_sdf, this_internal.spark_column_name_for(label))
189+
for label in this_internal.column_labels
162190
]
163191
+ [
164-
scol_for(that_sdf, that._internal.spark_column_name_for(label)).alias(
165-
"__that_%s" % that._internal.spark_column_name_for(label)
166-
)
167-
for label in that._internal.column_labels
192+
scol_for(that_sdf, that_internal.spark_column_name_for(label))
193+
for label in that_internal.column_labels
168194
]
169195
+ order_column
170196
)
@@ -175,7 +201,7 @@ def combine_frames(this, *args, how="full", preserve_order_column=False):
175201
for col in joined_df.columns
176202
if col not in index_columns and col != NATURAL_ORDER_COLUMN_NAME
177203
]
178-
level = max(this._internal.column_labels_level, that._internal.column_labels_level)
204+
level = max(this_internal.column_labels_level, that_internal.column_labels_level)
179205

180206
def fill_label(label):
181207
if label is None:
@@ -184,15 +210,15 @@ def fill_label(label):
184210
return ([""] * (level - len(label))) + list(label)
185211

186212
column_labels = [
187-
tuple(["this"] + fill_label(label)) for label in this._internal.column_labels
188-
] + [tuple(["that"] + fill_label(label)) for label in that._internal.column_labels]
213+
tuple(["this"] + fill_label(label)) for label in this_internal.column_labels
214+
] + [tuple(["that"] + fill_label(label)) for label in that_internal.column_labels]
189215
column_label_names = (
190-
[None] * (1 + level - this._internal.column_labels_level)
191-
) + this._internal.column_label_names
216+
[None] * (1 + level - this_internal.column_labels_level)
217+
) + this_internal.column_label_names
192218
return DataFrame(
193219
InternalFrame(
194220
spark_frame=joined_df,
195-
index_map=OrderedDict(zip(index_column_names, this._internal.index_names)),
221+
index_map=OrderedDict(zip(index_column_names, this_internal.index_names)),
196222
column_labels=column_labels,
197223
data_spark_columns=[scol_for(joined_df, col) for col in new_data_columns],
198224
column_label_names=column_label_names,

0 commit comments

Comments
 (0)