Skip to content

Commit 1cb4ba0

Browse files
ueshinHyukjinKwon
authored andcommitted
Introduce column_scols in InternalFrame substitude for data_columns. (#956)
1 parent 06d6861 commit 1cb4ba0

13 files changed

+319
-196
lines changed

databricks/koalas/frame.py

+99-69
Large diffs are not rendered by default.

databricks/koalas/generic.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
3434
from databricks.koalas.indexing import AtIndexer, ILocIndexer, LocIndexer
3535
from databricks.koalas.internal import _InternalFrame
36-
from databricks.koalas.utils import validate_arguments_and_invoke_function
36+
from databricks.koalas.utils import validate_arguments_and_invoke_function, scol_for
3737
from databricks.koalas.window import Rolling, Expanding
3838

3939

@@ -1416,15 +1416,18 @@ def median(self, accuracy=10000):
14161416
# This code path cannot reuse `_reduce_for_stat_function` since there looks no proper way
14171417
# to get a column name from Spark column but we need it to pass it through `expr`.
14181418
kdf = kdf_or_kser
1419-
sdf = kdf._sdf
1419+
sdf = kdf._sdf.select(kdf._internal.scols)
14201420
median = lambda name: F.expr("approx_percentile(`%s`, 0.5, %s)" % (name, accuracy))
14211421
sdf = sdf.select([median(col).alias(col) for col in kdf._internal.data_columns])
14221422

14231423
# Attach a dummy column for index to avoid default index.
14241424
sdf = sdf.withColumn('__DUMMY__', F.monotonically_increasing_id())
14251425

14261426
# This is expected to be small so it's fine to transpose.
1427-
return DataFrame(kdf._internal.copy(sdf=sdf, index_map=[('__DUMMY__', None)])) \
1427+
return DataFrame(kdf._internal.copy(
1428+
sdf=sdf,
1429+
index_map=[('__DUMMY__', None)],
1430+
column_scols=[scol_for(sdf, col) for col in kdf._internal.data_columns])) \
14281431
._to_internal_pandas().transpose().iloc[:, 0]
14291432

14301433
# TODO: 'center', 'win_type', 'on', 'axis' parameter should be implemented.

databricks/koalas/groupby.py

+44-33
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ def _spark_groupby(kdf, func, groupkeys):
219219
else:
220220
index_map = None
221221
return _InternalFrame(sdf=sdf,
222-
data_columns=data_columns,
223222
column_index=column_index,
223+
column_scols=[scol_for(sdf, col) for col in data_columns],
224224
index_map=index_map)
225225

226226
def count(self):
@@ -493,10 +493,10 @@ def size(self):
493493
else:
494494
name = 'count'
495495
internal = _InternalFrame(sdf=sdf,
496-
data_columns=[name],
497496
index_map=[(SPARK_INDEX_NAME_FORMAT(i),
498497
s._internal.column_index[0])
499-
for i, s in enumerate(groupkeys)])
498+
for i, s in enumerate(groupkeys)],
499+
column_scols=[scol_for(sdf, name)])
500500
return _col(DataFrame(internal))
501501

502502
def diff(self, periods=1):
@@ -893,7 +893,9 @@ def apply(self, func):
893893

894894
if should_infer_schema:
895895
# If schema is inferred, we can restore indexes too.
896-
internal = kdf._internal.copy(sdf=sdf)
896+
internal = kdf._internal.copy(sdf=sdf,
897+
column_scols=[scol_for(sdf, col)
898+
for col in kdf._internal.data_columns])
897899
else:
898900
# Otherwise, it loses index.
899901
internal = _InternalFrame(sdf=sdf)
@@ -945,7 +947,9 @@ def pandas_filter(pdf):
945947

946948
sdf = self._spark_group_map_apply(
947949
pandas_filter, data_schema, retain_index=True)
948-
return DataFrame(self._kdf._internal.copy(sdf=sdf))
950+
return DataFrame(self._kdf._internal.copy(
951+
sdf=sdf,
952+
column_scols=[scol_for(sdf, col) for col in self._kdf._internal.data_columns]))
949953

950954
def _spark_group_map_apply(self, func, return_schema, retain_index):
951955
index_columns = self._kdf._internal.index_columns
@@ -1153,13 +1157,13 @@ def idxmax(self, skipna=True):
11531157
stat_exprs.append(F.max(scol_for(sdf, name)).alias(name))
11541158
sdf = sdf.groupby(*groupkey_cols).agg(*stat_exprs)
11551159
internal = _InternalFrame(sdf=sdf,
1156-
data_columns=[ks._internal.data_columns[0]
1157-
for ks in self._agg_columns],
1158-
column_index=[ks._internal.column_index[0]
1159-
for ks in self._agg_columns],
11601160
index_map=[(SPARK_INDEX_NAME_FORMAT(i),
11611161
s._internal.column_index[0])
1162-
for i, s in enumerate(groupkeys)])
1162+
for i, s in enumerate(groupkeys)],
1163+
column_index=[ks._internal.column_index[0]
1164+
for ks in self._agg_columns],
1165+
column_scols=[scol_for(sdf, ks._internal.data_columns[0])
1166+
for ks in self._agg_columns])
11631167
return DataFrame(internal)
11641168

11651169
# TODO: add axis parameter
@@ -1223,13 +1227,13 @@ def idxmin(self, skipna=True):
12231227
stat_exprs.append(F.max(scol_for(sdf, name)).alias(name))
12241228
sdf = sdf.groupby(*groupkey_cols).agg(*stat_exprs)
12251229
internal = _InternalFrame(sdf=sdf,
1226-
data_columns=[ks._internal.data_columns[0]
1227-
for ks in self._agg_columns],
1228-
column_index=[ks._internal.column_index[0]
1229-
for ks in self._agg_columns],
12301230
index_map=[(SPARK_INDEX_NAME_FORMAT(i),
12311231
s._internal.column_index[0])
1232-
for i, s in enumerate(groupkeys)])
1232+
for i, s in enumerate(groupkeys)],
1233+
column_index=[ks._internal.column_index[0]
1234+
for ks in self._agg_columns],
1235+
column_scols=[scol_for(sdf, ks._internal.data_columns[0])
1236+
for ks in self._agg_columns])
12331237
return DataFrame(internal)
12341238

12351239
def fillna(self, value=None, method=None, axis=None, inplace=False, limit=None):
@@ -1581,7 +1585,9 @@ def pandas_transform(pdf):
15811585
sdf = self._spark_group_map_apply(
15821586
pandas_transform, return_schema, retain_index=True)
15831587
# If schema is inferred, we can restore indexes too.
1584-
internal = kdf._internal.copy(sdf=sdf)
1588+
internal = kdf._internal.copy(sdf=sdf,
1589+
column_scols=[scol_for(sdf, col)
1590+
for col in kdf._internal.data_columns])
15851591
else:
15861592
return_type = _infer_return_type(func).tpe
15871593
data_columns = self._kdf._internal.data_columns
@@ -1708,8 +1714,8 @@ def _reduce_for_stat_function(self, sfun, only_numeric):
17081714
index_map=[(SPARK_INDEX_NAME_FORMAT(i),
17091715
s._internal.column_index[0])
17101716
for i, s in enumerate(groupkeys)],
1711-
data_columns=data_columns,
17121717
column_index=column_index,
1718+
column_scols=[scol_for(sdf, col) for col in data_columns],
17131719
column_index_names=self._kdf._internal.column_index_names)
17141720
kdf = DataFrame(internal)
17151721
if not self._as_index:
@@ -1767,8 +1773,9 @@ def _diff(self, *args, **kwargs):
17671773

17681774
sdf = kdf._sdf.select(kdf._internal.index_scols + [c._scol for c in applied])
17691775
internal = kdf._internal.copy(sdf=sdf,
1770-
data_columns=[c._internal.data_columns[0] for c in applied],
1771-
column_index=[c._internal.column_index[0] for c in applied])
1776+
column_index=[c._internal.column_index[0] for c in applied],
1777+
column_scols=[scol_for(sdf, c._internal.data_columns[0])
1778+
for c in applied])
17721779
return DataFrame(internal)
17731780

17741781
def _rank(self, *args, **kwargs):
@@ -1781,8 +1788,9 @@ def _rank(self, *args, **kwargs):
17811788

17821789
sdf = kdf._sdf.select(kdf._internal.index_scols + [c._scol for c in applied])
17831790
internal = kdf._internal.copy(sdf=sdf,
1784-
data_columns=[c._internal.data_columns[0] for c in applied],
1785-
column_index=[c._internal.column_index[0] for c in applied])
1791+
column_index=[c._internal.column_index[0] for c in applied],
1792+
column_scols=[scol_for(sdf, c._internal.data_columns[0])
1793+
for c in applied])
17861794
return DataFrame(internal)
17871795

17881796
def _cum(self, func):
@@ -1806,8 +1814,9 @@ def _cum(self, func):
18061814
sdf = kdf._sdf.select(
18071815
kdf._internal.index_scols + [c._scol for c in applied])
18081816
internal = kdf._internal.copy(sdf=sdf,
1809-
data_columns=[c._internal.data_columns[0] for c in applied],
1810-
column_index=[c._internal.column_index[0] for c in applied])
1817+
column_index=[c._internal.column_index[0] for c in applied],
1818+
column_scols=[scol_for(sdf, c._internal.data_columns[0])
1819+
for c in applied])
18111820
return DataFrame(internal)
18121821

18131822
def _fillna(self, *args, **kwargs):
@@ -1820,8 +1829,9 @@ def _fillna(self, *args, **kwargs):
18201829

18211830
sdf = kdf._sdf.select(kdf._internal.index_scols + [c._scol for c in applied])
18221831
internal = kdf._internal.copy(sdf=sdf,
1823-
data_columns=[c._internal.data_columns[0] for c in applied],
1824-
column_index=[c._internal.column_index[0] for c in applied])
1832+
column_index=[c._internal.column_index[0] for c in applied],
1833+
column_scols=[scol_for(sdf, c._internal.data_columns[0])
1834+
for c in applied])
18251835
return DataFrame(internal)
18261836

18271837
def _shift(self, periods, fill_value):
@@ -1833,8 +1843,9 @@ def _shift(self, periods, fill_value):
18331843

18341844
sdf = kdf._sdf.select(kdf._internal.index_scols + [c._scol for c in applied])
18351845
internal = kdf._internal.copy(sdf=sdf,
1836-
data_columns=[c._internal.data_columns[0] for c in applied],
1837-
column_index=[c._internal.column_index[0] for c in applied])
1846+
column_index=[c._internal.column_index[0] for c in applied],
1847+
column_scols=[scol_for(sdf, c._internal.data_columns[0])
1848+
for c in applied])
18381849
return DataFrame(internal)
18391850

18401851

@@ -1956,11 +1967,11 @@ def nsmallest(self, n=5):
19561967
window = Window.partitionBy([s._scol for s in groupkeys]).orderBy(F.col(name))
19571968
sdf = sdf.withColumn('rank', F.row_number().over(window)).filter(F.col('rank') <= n)
19581969
internal = _InternalFrame(sdf=sdf,
1959-
data_columns=[name],
19601970
index_map=([(s._internal.data_columns[0],
19611971
s._internal.column_index[0])
19621972
for s in self._groupkeys]
1963-
+ self._kdf._internal.index_map))
1973+
+ self._kdf._internal.index_map),
1974+
column_scols=[scol_for(sdf, name)])
19641975
return _col(DataFrame(internal))
19651976

19661977
# TODO: add keep parameter
@@ -2002,11 +2013,11 @@ def nlargest(self, n=5):
20022013
window = Window.partitionBy([s._scol for s in groupkeys]).orderBy(F.col(name).desc())
20032014
sdf = sdf.withColumn('rank', F.row_number().over(window)).filter(F.col('rank') <= n)
20042015
internal = _InternalFrame(sdf=sdf,
2005-
data_columns=[name],
20062016
index_map=([(s._internal.data_columns[0],
20072017
s._internal.column_index[0])
20082018
for s in self._groupkeys]
2009-
+ self._kdf._internal.index_map))
2019+
+ self._kdf._internal.index_map),
2020+
column_scols=[scol_for(sdf, name)])
20102021
return _col(DataFrame(internal))
20112022

20122023
# TODO: add bins, normalize parameter
@@ -2064,10 +2075,10 @@ def value_counts(self, sort=None, ascending=None, dropna=True):
20642075
sdf = sdf.orderBy(F.col(agg_column).desc())
20652076

20662077
internal = _InternalFrame(sdf=sdf,
2067-
data_columns=[agg_column],
20682078
index_map=[(SPARK_INDEX_NAME_FORMAT(i),
20692079
s._internal.column_index[0])
2070-
for i, s in enumerate(groupkeys)])
2080+
for i, s in enumerate(groupkeys)],
2081+
column_scols=[scol_for(sdf, agg_column)])
20712082
return _col(DataFrame(internal))
20722083

20732084

databricks/koalas/indexes.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ def __init__(self, data: Union[DataFrame, list], dtype=None, name=None,
8888
if scol is None:
8989
scol = kdf._internal.index_scols[0]
9090
internal = kdf._internal.copy(scol=scol,
91-
data_columns=kdf._internal.index_columns,
9291
column_index=kdf._internal.index_names,
92+
column_scols=kdf._internal.index_scols,
9393
column_index_names=None)
9494
IndexOpsMixin.__init__(self, internal, kdf)
9595

@@ -139,7 +139,7 @@ def to_pandas(self) -> pd.Index:
139139
internal = self._kdf._internal.copy(
140140
sdf=sdf,
141141
index_map=[(sdf.schema[0].name, self._kdf._internal.index_names[0])],
142-
data_columns=[], column_index=[], column_index_names=None)
142+
column_index=[], column_scols=[], column_index_names=None)
143143
return DataFrame(internal)._to_internal_pandas().index
144144

145145
toPandas = to_pandas

databricks/koalas/indexing.py

+9-11
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
from databricks.koalas.internal import _InternalFrame
3030
from databricks.koalas.exceptions import SparkPandasIndexingError, SparkPandasNotImplementedError
31+
from databricks.koalas.utils import scol_for
3132

3233

3334
def _make_col(c):
@@ -437,7 +438,7 @@ def raiseNotImplemented(description):
437438
cols_sel = None
438439

439440
if cols_sel is None:
440-
columns = self._kdf._internal.data_scols
441+
columns = self._kdf._internal.column_scols
441442
elif isinstance(cols_sel, spark.Column):
442443
columns = [cols_sel]
443444
column_index = None
@@ -475,9 +476,9 @@ def raiseNotImplemented(description):
475476
sdf = sdf.select(self._kdf._internal.index_scols + columns)
476477
index_columns = self._kdf._internal.index_columns
477478
data_columns = [column for column in sdf.columns if column not in index_columns]
478-
internal = _InternalFrame(
479-
sdf=sdf, data_columns=data_columns,
480-
index_map=self._kdf._internal.index_map, column_index=column_index)
479+
column_scols = [scol_for(sdf, col) for col in data_columns]
480+
internal = _InternalFrame(sdf=sdf, index_map=self._kdf._internal.index_map,
481+
column_index=column_index, column_scols=column_scols)
481482
kdf = DataFrame(internal)
482483
except AnalysisException:
483484
raise KeyError('[{}] don\'t exist in columns'
@@ -710,13 +711,13 @@ def raiseNotImplemented(description):
710711
if isinstance(cols_sel, Series):
711712
columns = [cols_sel._scol]
712713
elif isinstance(cols_sel, int):
713-
columns = [self._kdf._internal.data_scols[cols_sel]]
714+
columns = [self._kdf._internal.column_scols[cols_sel]]
714715
elif cols_sel is None or cols_sel == slice(None):
715-
columns = self._kdf._internal.data_scols
716+
columns = self._kdf._internal.column_scols
716717
elif isinstance(cols_sel, slice):
717718
if all(s is None or isinstance(s, int)
718719
for s in (cols_sel.start, cols_sel.stop, cols_sel.step)):
719-
columns = self._kdf._internal.data_scols[cols_sel]
720+
columns = self._kdf._internal.column_scols[cols_sel]
720721
else:
721722
not_none = cols_sel.start if cols_sel.start is not None \
722723
else cols_sel.stop if cols_sel.stop is not None else cols_sel.step
@@ -733,10 +734,7 @@ def raiseNotImplemented(description):
733734

734735
try:
735736
sdf = sdf.select(self._kdf._internal.index_scols + columns)
736-
index_columns = self._kdf._internal.index_columns
737-
data_columns = [column for column in sdf.columns if column not in index_columns]
738-
internal = _InternalFrame(
739-
sdf=sdf, data_columns=data_columns, index_map=self._kdf._internal.index_map)
737+
internal = _InternalFrame(sdf=sdf, index_map=self._kdf._internal.index_map)
740738
kdf = DataFrame(internal)
741739
except AnalysisException:
742740
raise KeyError('[{}] don\'t exist in columns'

0 commit comments

Comments
 (0)