Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concat for multi-index columns support. #680

Merged
merged 3 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion databricks/koalas/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,15 +434,24 @@ def raiseNotImplemented(description):
elif all(isinstance(key, Series) for key in cols_sel):
columns = [_make_col(key) for key in cols_sel]
column_index = None
elif (any(isinstance(key, str) for key in cols_sel)
and any(isinstance(key, tuple) for key in cols_sel)):
raise TypeError('Expected tuple, got str')
else:
if all(isinstance(key, tuple) for key in cols_sel):
level = self._kdf._internal.column_index_level
if any(len(key) != level for key in cols_sel):
raise ValueError('All the key level should be the same as column index level.')

column_to_index = list(zip(self._kdf._internal.data_columns,
self._kdf._internal.column_index))

columns = []
column_index = []
for key in cols_sel:
found = False
for column, idx in column_to_index:
if idx[0] == key:
if idx == key or idx[0] == key:
columns.append(_make_col(column))
column_index.append(idx)
found = True
Expand Down
2 changes: 0 additions & 2 deletions databricks/koalas/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,6 @@ def __init__(self, sdf: spark.DataFrame,
assert isinstance(sdf, spark.DataFrame)
if index_map is None:
# Here is when Koalas DataFrame is created directly from Spark DataFrame.
assert column_index is None
assert column_index_names is None
assert "__index_level_0__" not in sdf.schema.names, \
"Default index column should not appear in columns of the Spark DataFrame"

Expand Down
93 changes: 44 additions & 49 deletions databricks/koalas/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Optional, Union
from collections import OrderedDict
from collections.abc import Iterable
from functools import reduce
import itertools

import numpy as np
Expand Down Expand Up @@ -1130,17 +1131,19 @@ def concat(objs, axis=0, join='outer', ignore_index=False):
0 c 3
1 d 4
"""
if not isinstance(objs, (dict, Iterable)):
if not isinstance(objs, Iterable): # TODO: support dict
raise TypeError('first argument must be an iterable of koalas '
'objects, you passed an object of type '
'"{name}"'.format(name=type(objs).__name__))

if axis not in [0, 'index']:
raise ValueError('axis should be either 0 or "index" currently.')

if all(map(lambda obj: obj is None, objs)):
raise ValueError("All objects passed were None")
if len(objs) == 0:
raise ValueError('No objects to concatenate')
objs = list(filter(lambda obj: obj is not None, objs))
if len(objs) == 0:
raise ValueError('All objects passed were None')

for obj in objs:
if not isinstance(obj, (Series, DataFrame)):
Expand All @@ -1156,88 +1159,80 @@ def concat(objs, axis=0, join='outer', ignore_index=False):
new_objs = []
for obj in objs:
if isinstance(obj, Series):
obj = obj.to_dataframe()
obj = obj.rename('0').to_dataframe()
new_objs.append(obj)
objs = new_objs

column_index_levels = set(obj._internal.column_index_level for obj in objs)
if len(column_index_levels) != 1:
raise ValueError('MultiIndex columns should have the same levels')

# DataFrame, DataFrame, ...
# All Series are converted into DataFrame and then compute concat.
if not ignore_index:
indices_of_kdfs = [kdf._internal.index_map for kdf in objs]
indices_of_kdfs = [kdf.index for kdf in objs]
index_of_first_kdf = indices_of_kdfs[0]
for index_of_kdf in indices_of_kdfs:
if index_of_first_kdf != index_of_kdf:
if index_of_first_kdf.names != index_of_kdf.names:
raise ValueError(
'Index type and names should be same in the objects to concatenate. '
'You passed different indices '
'{index_of_first_kdf} and {index_of_kdf}'.format(
index_of_first_kdf=index_of_first_kdf, index_of_kdf=index_of_kdf))
index_of_first_kdf=index_of_first_kdf.names,
index_of_kdf=index_of_kdf.names))

columns_of_kdfs = [kdf._internal.columns for kdf in objs]
first_kdf = objs[0]
column_indexes_of_kdfs = [kdf._internal.column_index for kdf in objs]
if ignore_index:
columns_of_first_kdf = first_kdf._internal.data_columns
index_names_of_kdfs = [[] for _ in objs]
else:
columns_of_first_kdf = first_kdf._internal.columns
if all(current_kdf == columns_of_first_kdf for current_kdf in columns_of_kdfs):
index_names_of_kdfs = [kdf._internal.index_names for kdf in objs]
if (all(name == index_names_of_kdfs[0] for name in index_names_of_kdfs)
and all(idx == column_indexes_of_kdfs[0] for idx in column_indexes_of_kdfs)):
# If all columns are in the same order and values, use it.
kdfs = objs
merged_columns = column_indexes_of_kdfs[0]
else:
if ignore_index:
columns_to_apply = [kdf._internal.data_columns for kdf in objs]
else:
columns_to_apply = [kdf._internal.columns for kdf in objs]

if join == "inner":
interested_columns = set.intersection(*map(set, columns_to_apply))
interested_columns = set.intersection(*map(set, column_indexes_of_kdfs))
# Keep the column order with its firsts DataFrame.
interested_columns = list(map(
lambda c: columns_of_first_kdf[columns_of_first_kdf.index(c)],
interested_columns))
merged_columns = sorted(list(map(
lambda c: column_indexes_of_kdfs[0][column_indexes_of_kdfs[0].index(c)],
interested_columns)))

kdfs = []
for kdf in objs:
sdf = kdf._sdf.select(interested_columns)
if ignore_index:
kdfs.append(DataFrame(sdf))
else:
kdfs.append(DataFrame(first_kdf._internal.copy(sdf=sdf)))
kdfs = [kdf[merged_columns] for kdf in objs]
elif join == "outer":
# If there are columns unmatched, just sort the column names.
merged_columns = set(
itertools.chain.from_iterable(columns_to_apply))
merged_columns = \
sorted(list(set(itertools.chain.from_iterable(column_indexes_of_kdfs))))

kdfs = []
for kdf in objs:
if ignore_index:
columns_to_add = merged_columns - set(kdf._internal.data_columns)
else:
columns_to_add = merged_columns - set(kdf._internal.columns)
columns_to_add = list(set(merged_columns) - set(kdf._internal.column_index))

# TODO: NaN and None difference for missing values. pandas seems filling NaN.
kdf = kdf.assign(**dict(zip(columns_to_add, [None] * len(columns_to_add))))
sdf = kdf._sdf
for idx in columns_to_add:
sdf = sdf.withColumn(str(idx), F.lit(None))

if ignore_index:
sdf = kdf._sdf.select(sorted(kdf._internal.data_columns))
else:
sdf = kdf._sdf.select(
kdf._internal.index_columns + sorted(kdf._internal.data_columns))
kdf = DataFrame(kdf._internal.copy(
sdf=sdf,
data_columns=kdf._internal.data_columns + [str(idx) for idx in columns_to_add],
column_index=kdf._internal.column_index + columns_to_add))

kdf = DataFrame(kdf._internal.copy(sdf=sdf,
data_columns=sorted(kdf._internal.data_columns)))
kdfs.append(kdf)
kdfs.append(kdf[merged_columns])
else:
raise ValueError(
"Only can inner (intersect) or outer (union) join the other axis.")

concatenated = kdfs[0]._sdf
for kdf in kdfs[1:]:
concatenated = concatenated.unionByName(kdf._sdf)

if ignore_index:
result_kdf = DataFrame(concatenated.select(kdfs[0]._internal.data_columns))
sdfs = [kdf._sdf.select(kdf._internal.data_scols) for kdf in kdfs]
else:
result_kdf = DataFrame(kdfs[0]._internal.copy(sdf=concatenated))
sdfs = [kdf._sdf.select(kdf._internal.index_scols + kdf._internal.data_scols)
for kdf in kdfs]
concatenated = reduce(lambda x, y: x.union(y), sdfs)

index_map = None if ignore_index else kdfs[0]._internal.index_map
result_kdf = DataFrame(kdfs[0]._internal.copy(sdf=concatenated, index_map=index_map))

if should_return_series:
# If all input were Series, we should return Series.
Expand Down
31 changes: 31 additions & 0 deletions databricks/koalas/tests/test_namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ def test_concat(self):
ks.concat([kdf, kdf.reset_index()]),
pd.concat([pdf, pdf.reset_index()]))

self.assert_eq(
ks.concat([kdf, kdf[['A']]], ignore_index=True),
pd.concat([pdf, pdf[['A']]], ignore_index=True))

self.assert_eq(
ks.concat([kdf, kdf[['A']]], join="inner"),
pd.concat([pdf, pdf[['A']]], join="inner"))

self.assertRaisesRegex(TypeError, "first argument must be", lambda: ks.concat(kdf))
self.assertRaisesRegex(
TypeError, "cannot concatenate object", lambda: ks.concat([kdf, 1]))
Expand All @@ -46,8 +54,31 @@ def test_concat(self):
self.assertRaisesRegex(
ValueError, "Index type and names should be same", lambda: ks.concat([kdf, kdf2]))

self.assertRaisesRegex(ValueError, "No objects to concatenate", lambda: ks.concat([]))

self.assertRaisesRegex(
ValueError, "All objects passed", lambda: ks.concat([None, None]))

self.assertRaisesRegex(
ValueError, 'axis should be either 0 or', lambda: ks.concat([kdf, kdf], axis=1))

pdf3 = pdf.copy()
kdf3 = kdf.copy()

columns = pd.MultiIndex.from_tuples([('X', 'A'), ('X', 'B')])
pdf3.columns = columns
kdf3.columns = columns

self.assert_eq(ks.concat([kdf3, kdf3.reset_index()]),
pd.concat([pdf3, pdf3.reset_index()]))

self.assert_eq(
ks.concat([kdf3, kdf3[[('X', 'A')]]], ignore_index=True),
pd.concat([pdf3, pdf3[[('X', 'A')]]], ignore_index=True))

self.assert_eq(
ks.concat([kdf3, kdf3[[('X', 'A')]]], join="inner"),
pd.concat([pdf3, pdf3[[('X', 'A')]]], join="inner"))

self.assertRaisesRegex(ValueError, "MultiIndex columns should have the same levels",
lambda: ks.concat([kdf, kdf3]))