diff --git a/python/ray/dataframe/__init__.py b/python/ray/dataframe/__init__.py index 7eea37f99f2a..3aa82b85b57e 100644 --- a/python/ray/dataframe/__init__.py +++ b/python/ray/dataframe/__init__.py @@ -3,7 +3,7 @@ from __future__ import print_function import pandas as pd -from pandas import (eval, Panel, date_range, MultiIndex) +from pandas import (eval, unique, value_counts, Panel, date_range, MultiIndex) import threading pd_version = pd.__version__ @@ -35,10 +35,13 @@ def get_npartitions(): read_msgpack, read_stata, read_sas, read_pickle, # noqa: 402 read_sql) # noqa: 402 from .concat import concat # noqa: 402 +from .datetimes import to_datetime # noqa: 402 +from .reshape import get_dummies # noqa: 402 __all__ = [ "DataFrame", "Series", "read_csv", "read_parquet", "concat", "eval", - "Panel", "date_range", "MultiIndex" + "unique", "value_counts", "to_datetime", "get_dummies", "Panel", + "date_range", "MultiIndex" ] try: diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index b96c4c836453..075d99709c36 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -2433,9 +2433,44 @@ def mod(self, other, axis='columns', level=None, fill_value=None): fill_value) def mode(self, axis=0, numeric_only=False): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Perform mode across the DataFrame. + + Args: + axis (int): The axis to take the mode on. + numeric_only (bool): if True, only apply to numeric columns. + + Returns: + DataFrame: The mode of the DataFrame. + """ + axis = pd.DataFrame()._get_axis_number(axis) + + def mode_helper(df): + mode_df = df.mode(axis=axis, numeric_only=numeric_only) + return mode_df, mode_df.shape[axis] + + def fix_length(df, *lengths): + max_len = max(lengths[0]) + df = df.reindex(pd.RangeIndex(max_len), axis=axis) + return df + + parts = self._col_partitions if axis == 0 else self._row_partitions + + result = [_deploy_func._submit(args=(lambda df: mode_helper(df), + part), num_return_vals=2) + for part in parts] + + parts, lengths = [list(t) for t in zip(*result)] + + parts = [_deploy_func.remote( + lambda df, *l: fix_length(df, l), part, *lengths) + for part in parts] + + if axis == 0: + return DataFrame(col_partitions=parts, + columns=self.columns) + else: + return DataFrame(row_partitions=parts, + index=self.index) def mul(self, other, axis='columns', level=None, fill_value=None): """Multiplies this DataFrame against another DataFrame/Series/scalar. @@ -3734,9 +3769,7 @@ def _getitem_array(self, key): index=index) else: columns = self._col_metadata[key].index - - indices_for_rows = [self.columns.index(new_col) - for new_col in columns] + indices_for_rows = [col for col in self.col if col in set(columns)] new_parts = [_deploy_func.remote( lambda df: df.__getitem__(indices_for_rows), diff --git a/python/ray/dataframe/datetimes.py b/python/ray/dataframe/datetimes.py new file mode 100644 index 000000000000..48a437cebc7a --- /dev/null +++ b/python/ray/dataframe/datetimes.py @@ -0,0 +1,64 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pandas +import ray + +from .dataframe import DataFrame +from .utils import _map_partitions + + +def to_datetime(arg, errors='raise', dayfirst=False, yearfirst=False, utc=None, + box=True, format=None, exact=True, unit=None, + infer_datetime_format=False, origin='unix'): + """Convert the arg to datetime format. If not Ray DataFrame, this falls + back on pandas. + + Args: + errors ('raise' or 'ignore'): If 'ignore', errors are silenced. + dayfirst (bool): Date format is passed in as day first. + yearfirst (bool): Date format is passed in as year first. + utc (bool): retuns a UTC DatetimeIndex if True. + box (bool): If True, returns a DatetimeIndex. + format (string): strftime to parse time, eg "%d/%m/%Y". + exact (bool): If True, require an exact format match. + unit (string, default 'ns'): unit of the arg. + infer_datetime_format (bool): Whether or not to infer the format. + origin (string): Define the reference date. + + Returns: + Type depends on input: + + - list-like: DatetimeIndex + - Series: Series of datetime64 dtype + - scalar: Timestamp + """ + if not isinstance(arg, DataFrame): + return pandas.to_datetime(arg, errors=errors, dayfirst=dayfirst, + yearfirst=yearfirst, utc=utc, box=box, + format=format, exact=exact, unit=unit, + infer_datetime_format=infer_datetime_format, + origin=origin) + if errors == 'raise': + pandas.to_datetime(pandas.DataFrame(columns=arg.columns), + errors=errors, dayfirst=dayfirst, + yearfirst=yearfirst, utc=utc, box=box, + format=format, exact=exact, unit=unit, + infer_datetime_format=infer_datetime_format, + origin=origin) + + def datetime_helper(df, cols): + df.columns = cols + return pandas.to_datetime(df, errors=errors, dayfirst=dayfirst, + yearfirst=yearfirst, utc=utc, box=box, + format=format, exact=exact, unit=unit, + infer_datetime_format=infer_datetime_format, + origin=origin) + + datetime_series = _map_partitions(datetime_helper, arg._row_partitions, + arg.columns) + result = pandas.concat(ray.get(datetime_series), copy=False) + result.index = arg.index + + return result diff --git a/python/ray/dataframe/reshape.py b/python/ray/dataframe/reshape.py new file mode 100644 index 000000000000..1883f11b78d7 --- /dev/null +++ b/python/ray/dataframe/reshape.py @@ -0,0 +1,125 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import ray +import pandas +import numpy as np + +from pandas import compat +from pandas.core.dtypes.common import is_list_like +from itertools import cycle + +from .dataframe import DataFrame +from .utils import _deploy_func + + +def get_dummies(data, prefix=None, prefix_sep='_', dummy_na=False, + columns=None, sparse=False, drop_first=False): + """Convert categorical variable into indicator variables. + + Args: + data (array-like, Series, or DataFrame): data to encode. + prefix (string, [string]): Prefix to apply to each encoded column + label. + prefix_sep (string, [string]): Separator between prefix and value. + dummy_na (bool): Add a column to indicate NaNs. + columns: Which columns to encode. + sparse (bool): Not Implemented: If True, returns SparseDataFrame. + drop_first (bool): Whether to remove the first level of encoded data. + + Returns: + DataFrame or one-hot encoded data. + """ + if not isinstance(data, DataFrame): + return pandas.get_dummies(data, prefix=prefix, prefix_sep=prefix_sep, + dummy_na=dummy_na, columns=columns, + sparse=sparse, drop_first=drop_first) + + if sparse: + raise NotImplementedError( + "SparseDataFrame is not implemented. " + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") + + if columns is None: + columns_to_encode = data.dtypes.isin([np.dtype("O"), 'category']) + columns_to_encode = data.columns[columns_to_encode] + else: + columns_to_encode = columns + + def check_len(item, name): + len_msg = ("Length of '{name}' ({len_item}) did not match the " + "length of the columns being encoded ({len_enc}).") + + if is_list_like(item): + if not len(item) == len(columns_to_encode): + len_msg = len_msg.format(name=name, len_item=len(item), + len_enc=len(columns_to_encode)) + raise ValueError(len_msg) + + check_len(prefix, 'prefix') + check_len(prefix_sep, 'prefix_sep') + if isinstance(prefix, compat.string_types): + prefix = cycle([prefix]) + prefix = [next(prefix) for i in range(len(columns_to_encode))] + if isinstance(prefix, dict): + prefix = [prefix[col] for col in columns_to_encode] + + if prefix is None: + prefix = columns_to_encode + + # validate separators + if isinstance(prefix_sep, compat.string_types): + prefix_sep = cycle([prefix_sep]) + prefix_sep = [next(prefix_sep) for i in range(len(columns_to_encode))] + elif isinstance(prefix_sep, dict): + prefix_sep = [prefix_sep[col] for col in columns_to_encode] + + if set(columns_to_encode) == set(data.columns): + with_dummies = [] + dropped_columns = pandas.Index() + else: + with_dummies = data.drop(columns_to_encode, axis=1)._col_partitions + dropped_columns = data.columns.drop(columns_to_encode) + + def get_dummies_remote(df, to_drop, prefix, prefix_sep): + df = df.drop(to_drop, axis=1) + + if df.size == 0: + return df, df.columns + + df = pandas.get_dummies(df, prefix=prefix, prefix_sep=prefix_sep, + dummy_na=dummy_na, columns=None, sparse=sparse, + drop_first=drop_first) + columns = df.columns + df.columns = pandas.RangeIndex(0, len(df.columns)) + return df, columns + + total = 0 + columns = [] + for i, part in enumerate(data._col_partitions): + col_index = data._col_metadata.partition_series(i) + + # TODO(kunalgosar): Handle the case of duplicate columns here + to_encode = col_index.index.isin(columns_to_encode) + + to_encode = col_index[to_encode] + to_drop = col_index.drop(to_encode.index) + + result = _deploy_func._submit( + args=(get_dummies_remote, part, to_drop, + prefix[total:total + len(to_encode)], + prefix_sep[total:total + len(to_encode)]), + num_return_vals=2) + + with_dummies.append(result[0]) + columns.append(result[1]) + total += len(to_encode) + + columns = ray.get(columns) + dropped_columns = dropped_columns.append(columns) + + return DataFrame(col_partitions=with_dummies, + columns=dropped_columns, + index=data.index) diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 60d2862d9cf9..3487e4342a8d 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -2059,11 +2059,11 @@ def test_mod(): test_inter_df_math("mod", simple=False) -def test_mode(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.mode() +@pytest.fixture +def test_mode(ray_df, pandas_df): + assert(ray_series_equals_pandas(ray_df.mode(), pandas_df.mode())) + assert(ray_series_equals_pandas(ray_df.mode(axis=1), + pandas_df.mode(axis=1))) def test_mul(): @@ -3058,3 +3058,25 @@ def test__doc__(): pd_obj = getattr(pd.DataFrame, attr, None) if callable(pd_obj) or isinstance(pd_obj, property): assert obj.__doc__ == pd_obj.__doc__ + + +def test_to_datetime(): + ray_df = rdf.DataFrame({'year': [2015, 2016], + 'month': [2, 3], + 'day': [4, 5]}) + pd_df = pd.DataFrame({'year': [2015, 2016], + 'month': [2, 3], + 'day': [4, 5]}) + + rdf.to_datetime(ray_df).equals(pd.to_datetime(pd_df)) + + +def test_get_dummies(): + ray_df = rdf.DataFrame({'A': ['a', 'b', 'a'], + 'B': ['b', 'a', 'c'], + 'C': [1, 2, 3]}) + pd_df = pd.DataFrame({'A': ['a', 'b', 'a'], + 'B': ['b', 'a', 'c'], + 'C': [1, 2, 3]}) + + ray_df_equals_pandas(rdf.get_dummies(ray_df), pd.get_dummies(pd_df))