Skip to content

Commit

Permalink
Reduce usages of data_columns. (#793)
Browse files Browse the repository at this point in the history
With this change, the following DataFrame APIs support multi-index columns:

- `applymap`
- `shift`
- `diff`
- `fillna`
- `rank`
  • Loading branch information
ueshin authored Sep 18, 2019
1 parent f7416f1 commit d0b4f5f
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 33 deletions.
50 changes: 25 additions & 25 deletions databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,9 @@ def _reduce_for_stat_function(self, sfun, name, axis=None, numeric_only=False):
if axis in ('index', 0, None):
exprs = []
num_args = len(signature(sfun).parameters)
for col, idx in zip(self._internal.data_columns, self._internal.column_index):
col_sdf = self._internal.scol_for(col)
col_type = self._internal.spark_type_for(col)
for idx in self._internal.column_index:
col_sdf = self._internal.scol_for(idx)
col_type = self._internal.spark_type_for(idx)

is_numeric_or_boolean = isinstance(col_type, (NumericType, BooleanType))
min_or_max = sfun.__name__ in ('min', 'max')
Expand Down Expand Up @@ -862,8 +862,8 @@ def applymap(self, func):
"""

applied = []
for column in self._internal.data_columns:
applied.append(self[column].apply(func))
for idx in self._internal.column_index:
applied.append(self[idx].apply(func))

sdf = self._sdf.select(
self._internal.index_scols + [c._scol for c in applied])
Expand Down Expand Up @@ -1732,7 +1732,7 @@ def empty(self):
>>> ks.DataFrame({}, index=list('abc')).empty
True
"""
return len(self._internal.data_columns) == 0 or self._sdf.rdd.isEmpty()
return len(self._internal.column_index) == 0 or self._sdf.rdd.isEmpty()

@property
def style(self):
Expand Down Expand Up @@ -2209,8 +2209,8 @@ def shift(self, periods=1, fill_value=None):
"""
applied = []
for column in self._internal.data_columns:
applied.append(self[column].shift(periods, fill_value))
for idx in self._internal.column_index:
applied.append(self[idx].shift(periods, fill_value))

sdf = self._sdf.select(
self._internal.index_scols + [c._scol for c in applied])
Expand Down Expand Up @@ -2291,8 +2291,8 @@ def diff(self, periods: int = 1, axis: Union[int, str] = 0):
if axis not in [0, 'index']:
raise ValueError('axis should be either 0 or "index" currently.')
applied = []
for column in self._internal.data_columns:
applied.append(self[column].diff(periods))
for idx in self._internal.column_index:
applied.append(self[idx].diff(periods))
sdf = self._sdf.select(
self._internal.index_scols + [c._scol for c in applied])
internal = self._internal.copy(sdf=sdf,
Expand Down Expand Up @@ -3307,16 +3307,17 @@ def fillna(self, value=None, method=None, axis=None, inplace=False, limit=None):
for v in value.values():
if not isinstance(v, (float, int, str, bool)):
raise TypeError("Unsupported type %s" % type(v))
value = {self._internal.column_name_for(key): value for key, value in value.items()}
if limit is not None:
raise ValueError('limit parameter for value is not support now')
sdf = sdf.fillna(value)
internal = self._internal.copy(sdf=sdf)
else:
applied = []
for col in self._internal.data_columns:
applied.append(self[col].fillna(value=value, method=method, axis=axis,
for idx in self._internal.column_index:
applied.append(self[idx].fillna(value=value, method=method, axis=axis,
inplace=False, limit=limit))
sdf = self._sdf.select(self._internal.index_columns + [col._scol for col in applied])
sdf = self._sdf.select(self._internal.index_scols + [col._scol for col in applied])
internal = self._internal.copy(sdf=sdf,
data_columns=[col._internal.data_columns[0]
for col in applied],
Expand Down Expand Up @@ -4012,7 +4013,7 @@ def columns(self):
def columns(self, columns):
if isinstance(columns, pd.MultiIndex):
column_index = columns.tolist()
old_names = self._internal.data_columns
old_names = self._internal.column_index
if len(old_names) != len(column_index):
raise ValueError(
"Length mismatch: Expected axis has %d elements, new values have %d elements"
Expand All @@ -4021,7 +4022,7 @@ def columns(self, columns):
self._internal = self._internal.copy(column_index=column_index,
column_index_names=column_index_names)
else:
old_names = self._internal.data_columns
old_names = self._internal.column_index
if len(old_names) != len(columns):
raise ValueError(
"Length mismatch: Expected axis has %d elements, new values have %d elements"
Expand Down Expand Up @@ -4213,18 +4214,18 @@ def select_dtypes(self, include=None, exclude=None):

columns = []
column_index = []
for idx, col in zip(self._internal.column_index, self._internal.data_columns):
for idx in self._internal.column_index:
if len(include) > 0:
should_include = (
infer_dtype_from_object(self[idx].dtype.name) in include_numpy_type or
self._internal.spark_type_for(col) in include_spark_type)
self._internal.spark_type_for(idx) in include_spark_type)
else:
should_include = not (
infer_dtype_from_object(self[idx].dtype.name) in exclude_numpy_type or
self._internal.spark_type_for(col) in exclude_spark_type)
self._internal.spark_type_for(idx) in exclude_spark_type)

if should_include:
columns.append(col)
columns.append(self._internal.column_name_for(idx))
column_index.append(idx)

return DataFrame(self._internal.copy(
Expand Down Expand Up @@ -6062,13 +6063,12 @@ def _reindex_columns(self, columns):
if len(col) != level:
raise ValueError("shape (1,{}) doesn't match the shape (1,{})"
.format(len(col), level))
index_to_column = dict(zip(self._internal.column_index, self._internal.data_columns))
scols, columns, idx = [], [], []
null_columns = False
for label in label_columns:
if index_to_column.get(label, None) is not None:
scols.append(self._internal.scol_for(index_to_column[label]))
columns.append(index_to_column[label])
if label in self._internal.column_index:
scols.append(self._internal.scol_for(label))
columns.append(self._internal.column_name_for(label))
else:
scols.append(F.lit(np.nan).alias(str(label)))
columns.append(str(label))
Expand Down Expand Up @@ -6431,8 +6431,8 @@ def rank(self, method='average', ascending=True):
3 3.0 1.0
"""
applied = []
for column in self._internal.data_columns:
applied.append(self[column].rank(method=method, ascending=ascending))
for idx in self._internal.column_index:
applied.append(self[idx].rank(method=method, ascending=ascending))

sdf = self._sdf.select(self._internal.index_columns + [column._scol for column in applied])
internal = self._internal.copy(sdf=sdf,
Expand Down
4 changes: 2 additions & 2 deletions databricks/koalas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2608,7 +2608,7 @@ def _rank(self, method='average', ascending=True, part_cols=()):
asc_func = spark.functions.desc

index_column = self._internal.index_columns[0]
column_name = self.name
column_name = self._internal.data_columns[0]

if method == 'first':
window = Window.orderBy(
Expand All @@ -2633,7 +2633,7 @@ def _rank(self, method='average', ascending=True, part_cols=()):
*[column_name] + list(part_cols)
).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
scol = stat_func(F.row_number().over(window1)).over(window2)
kser = self._with_new_scol(scol).rename(column_name)
kser = self._with_new_scol(scol).rename(self.name)
return kser.astype(np.float64)

def describe(self, percentiles: Optional[List[float]] = None) -> 'Series':
Expand Down
46 changes: 40 additions & 6 deletions databricks/koalas/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,21 @@ def test_fillna(self):
"Must specify a fillna 'value' or 'method' parameter."):
kdf.fillna()

# multi-index columns
pdf = pd.DataFrame({('x', 'a'): [np.nan, 2, 3, 4, np.nan, 6],
('x', 'b'): [1, 2, np.nan, 4, np.nan, np.nan],
('y', 'c'): [1, 2, 3, 4, np.nan, np.nan]},
index=[10, 20, 30, 40, 50, 60])
kdf = ks.from_pandas(pdf)

self.assert_eq(kdf.fillna(-1), pdf.fillna(-1))
self.assert_eq(kdf.fillna({('x', 'a'): -1, ('x', 'b'): -2, ('y', 'c'): -5}),
pdf.fillna({('x', 'a'): -1, ('x', 'b'): -2, ('y', 'c'): -5}))
self.assert_eq(pdf.fillna(method='ffill'), kdf.fillna(method='ffill'))
self.assert_eq(pdf.fillna(method='ffill', limit=2), kdf.fillna(method='ffill', limit=2))
self.assert_eq(pdf.fillna(method='bfill'), kdf.fillna(method='bfill'))
self.assert_eq(pdf.fillna(method='bfill', limit=2), kdf.fillna(method='bfill', limit=2))

def test_isnull(self):
pdf = pd.DataFrame({'x': [1, 2, 3, 4, None, 6], 'y': list('abdabd')},
index=[10, 20, 30, 40, 50, 60])
Expand Down Expand Up @@ -1428,8 +1443,6 @@ def test_rank(self):
pdf = pd.DataFrame(data={'col1': [1, 2, 3, 1], 'col2': [3, 4, 3, 1]},
columns=['col1', 'col2'])
kdf = ks.from_pandas(pdf)
self.assert_eq(pdf.rank(),
kdf.rank().sort_index())
self.assert_eq(pdf.rank(),
kdf.rank().sort_index())
self.assert_eq(pdf.rank(ascending=False),
Expand All @@ -1447,6 +1460,13 @@ def test_rank(self):
with self.assertRaisesRegex(ValueError, msg):
kdf.rank(method='nothing')

# multi-index columns
columns = pd.MultiIndex.from_tuples([('x', 'col1'), ('y', 'col2')])
pdf.columns = columns
kdf.columns = columns
self.assert_eq(pdf.rank(),
kdf.rank().sort_index())

def test_round(self):
pdf = pd.DataFrame({'A': [0.028208, 0.038683, 0.877076],
'B': [0.992815, 0.645646, 0.149370],
Expand All @@ -1472,15 +1492,22 @@ def test_shift(self):
kdf = ks.from_pandas(pdf)
self.assert_eq(pdf.shift(3), kdf.shift(3).sort_index())

pdf = pd.DataFrame({'Col1': [0, 0, 0, 10, 20],
'Col2': [0, 0, 0, 13, 23],
'Col3': [0, 0, 0, 17, 27]})
self.assert_eq(pdf,
# Need the expected result since pandas 0.23 does not support `fill_value` argument.
pdf1 = pd.DataFrame({'Col1': [0, 0, 0, 10, 20],
'Col2': [0, 0, 0, 13, 23],
'Col3': [0, 0, 0, 17, 27]})
self.assert_eq(pdf1,
kdf.shift(periods=3, fill_value=0).sort_index())
msg = "should be an int"
with self.assertRaisesRegex(ValueError, msg):
kdf.shift(1.5)

# multi-index columns
columns = pd.MultiIndex.from_tuples([('x', 'Col1'), ('x', 'Col2'), ('y', 'Col3')])
pdf.columns = columns
kdf.columns = columns
self.assert_eq(pdf.shift(3), kdf.shift(3).sort_index())

def test_diff(self):
pdf = pd.DataFrame({'a': [1, 2, 3, 4, 5, 6],
'b': [1, 1, 2, 3, 5, 8],
Expand All @@ -1496,6 +1523,13 @@ def test_diff(self):
with self.assertRaisesRegex(ValueError, msg):
kdf.diff(axis=1)

# multi-index columns
columns = pd.MultiIndex.from_tuples([('x', 'Col1'), ('x', 'Col2'), ('y', 'Col3')])
pdf.columns = columns
kdf.columns = columns
self.assert_eq(pdf.diff(),
kdf.diff().sort_index())

def test_duplicated(self):
pdf = pd.DataFrame({'a': [1, 1, 1, 3], 'b': [1, 1, 1, 4], 'c': [1, 1, 1, 5]})
kdf = ks.from_pandas(pdf)
Expand Down

0 comments on commit d0b4f5f

Please sign in to comment.