diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 27807e2359b..189fc60596b 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -4,6 +4,7 @@ import numpy as np import pandas + from pandas.compat import string_types from pandas.core.dtypes.cast import find_common_type from pandas.core.dtypes.common import (_get_dtype_from_object, is_list_like) @@ -758,6 +759,14 @@ def std(self, **kwargs): func = self._prepare_method(pandas.DataFrame.std, **kwargs) return self.full_axis_reduce(func, axis) + def to_datetime(self, **kwargs): + columns = self.columns + def to_datetime_builder(df, **kwargs): + df.columns = columns + return pandas.to_datetime(df, **kwargs) + func = self._prepare_method(to_datetime_builder, **kwargs) + return self.full_axis_reduce(func, 1) + def var(self, **kwargs): # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) @@ -866,8 +875,8 @@ def cumprod(self, **kwargs): def dropna(self, **kwargs): axis = kwargs.get("axis", 0) - subset = kwargs.get("subset", None) - thresh = kwargs.get("thresh", None) + subset = kwargs.get("subset") + thresh = kwargs.get("thresh") how = kwargs.get("how", "any") # We need to subset the axis that we care about with `subset`. This # will be used to determine the number of values that are NA. @@ -930,7 +939,7 @@ def fillna(self, **kwargs): cls = type(self) axis = kwargs.get("axis", 0) - value = kwargs.pop("value", None) + value = kwargs.pop("value") if isinstance(value, dict): if axis == 0: @@ -957,8 +966,9 @@ def describe(self, **kwargs): new_data = self.map_across_full_axis(axis, func) new_index = self.compute_index(0, new_data, False) new_columns = self.compute_index(1, new_data, True) + new_dtypes = pandas.Series([np.float64 for _ in new_columns], index=new_columns) - return cls(new_data, new_index, new_columns) + return cls(new_data, new_index, new_columns, new_dtypes) def rank(self, **kwargs): cls = type(self) @@ -1062,7 +1072,7 @@ def from_pandas(cls, df, block_partitions_cls): df.columns = pandas.RangeIndex(len(df.columns)) new_data = block_partitions_cls.from_pandas(df) - return cls(new_data, new_index, new_columns, new_dtypes) + return cls(new_data, new_index, new_columns, dtypes=new_dtypes) # __getitem__ methods def getitem_single_key(self, key): @@ -1144,7 +1154,6 @@ def delitem(df, internal_indices=[]): # it throws an error. new_columns = [self.columns[i] for i in range(len(self.columns)) if i not in numeric_indices] new_dtypes = self.dtypes.drop(columns) - return cls(new_data, new_index, new_columns, new_dtypes) # END __delitem__ and drop @@ -1164,7 +1173,7 @@ def insert(df, internal_indices=[]): new_data = self.data.apply_func_to_select_indices_along_full_axis(0, insert, loc, keep_remaining=True) new_columns = self.columns.insert(loc, column) - # Because a Pandas Series does not allow insert, we make a DataFrame + # Because a Pandas Series does not allow insert, we make a DataFrame # and insert the new dtype that way. temp_dtypes = pandas.DataFrame(self.dtypes).T temp_dtypes.insert(loc, column, _get_dtype_from_object(value)) @@ -1187,18 +1196,21 @@ def astype(self, col_dtypes, errors='raise', **kwargs): for i, column in enumerate(columns): dtype = col_dtypes[column] - if dtype in dtype_indices.keys(): - dtype_indices[dtype].append(numeric_indices[i]) - else: - dtype_indices[dtype] = [numeric_indices[i]] - new_dtype = np.dtype(dtype) - if dtype != np.int32 and new_dtype == np.int32: - new_dtype = np.dtype('int64') - elif dtype != np.float32 and new_dtype == np.float32: - new_dtype = np.dtype('float64') - new_dtypes[column] = new_dtype - + if dtype != self.dtypes[column]: + if dtype in dtype_indices.keys(): + dtype_indices[dtype].append(numeric_indices[i]) + else: + dtype_indices[dtype] = [numeric_indices[i]] + new_dtype = np.dtype(dtype) + if dtype != np.int32 and new_dtype == np.int32: + new_dtype = np.dtype('int64') + elif dtype != np.float32 and new_dtype == np.float32: + new_dtype = np.dtype('float64') + new_dtypes[column] = new_dtype + + new_data = self.data for dtype in dtype_indices.keys(): + resulting_dtype = None def astype(df, internal_indices=[]): block_dtypes = dict() @@ -1208,8 +1220,8 @@ def astype(df, internal_indices=[]): new_data = self.data.apply_func_to_select_indices(0, astype, dtype_indices[dtype], keep_remaining=True) - return cls(self.data, self.index, self.columns, new_dtypes) - # END astype + return cls(new_data, self.index, self.columns, new_dtypes) + # END type conversions # UDF (apply and agg) methods # There is a wide range of behaviors that are supported, so a lot of the @@ -1302,7 +1314,6 @@ def callable_apply_builder(df, func, axis, index, *args, **kwargs): func_prepared = self._prepare_method(lambda df: callable_apply_builder(df, func, axis, index, *args, **kwargs)) result_data = self.map_across_full_axis(axis, func_prepared) - return self._post_process_apply(result_data, axis) # END UDF diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index ba381217c2d..ddc8147d460 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1912,17 +1912,14 @@ def iterrows(self): Returns: A generator that iterates over the rows of the frame. """ - index_iter = (self._row_metadata.partition_series(i).index - for i in range(len(self._row_partitions))) + index_iter = iter(self.index) - def iterrow_helper(part): - df = ray.get(part) + def iterrow_builder(df): df.columns = self.columns - df.index = next(index_iter) + df.index = [next(index_iter)] return df.iterrows() - partition_iterator = PartitionIterator(self._row_partitions, - iterrow_helper) + partition_iterator = PartitionIterator(self._data_manager, 0, iterrow_builder) for v in partition_iterator: yield v @@ -1938,17 +1935,14 @@ def items(self): Returns: A generator that iterates over the columns of the frame. """ - col_iter = (self._col_metadata.partition_series(i).index - for i in range(len(self._col_partitions))) + col_iter = iter(self.columns) - def items_helper(part): - df = ray.get(part) - df.columns = next(col_iter) + def items_builder(df): + df.columns = [next(col_iter)] df.index = self.index return df.items() - partition_iterator = PartitionIterator(self._col_partitions, - items_helper) + partition_iterator = PartitionIterator(self._data_manager, 1, items_builder) for v in partition_iterator: yield v @@ -1980,17 +1974,14 @@ def itertuples(self, index=True, name='Pandas'): Returns: A tuple representing row data. See args for varying tuples. """ - index_iter = (self._row_metadata.partition_series(i).index - for i in range(len(self._row_partitions))) + index_iter = iter(self.index) - def itertuples_helper(part): - df = ray.get(part) + def itertuples_builder(df): df.columns = self.columns - df.index = next(index_iter) + df.index = [next(index_iter)] return df.itertuples(index=index, name=name) - partition_iterator = PartitionIterator(self._row_partitions, - itertuples_helper) + partition_iterator = PartitionIterator(self._data_manager, 0, itertuples_builder) for v in partition_iterator: yield v diff --git a/modin/pandas/datetimes.py b/modin/pandas/datetimes.py index 77a03de607c..a3e293b2fbc 100644 --- a/modin/pandas/datetimes.py +++ b/modin/pandas/datetimes.py @@ -56,38 +56,20 @@ def to_datetime(arg, unit=unit, infer_datetime_format=infer_datetime_format, origin=origin) - if errors == 'raise': - pandas.to_datetime( - pandas.DataFrame(columns=arg.columns), - errors=errors, - dayfirst=dayfirst, - yearfirst=yearfirst, - utc=utc, - box=box, - format=format, - exact=exact, - unit=unit, - infer_datetime_format=infer_datetime_format, - origin=origin) - - def datetime_helper(df, cols): - df.columns = cols - return pandas.to_datetime( - df, - errors=errors, - dayfirst=dayfirst, - yearfirst=yearfirst, - utc=utc, - box=box, - format=format, - exact=exact, - unit=unit, - infer_datetime_format=infer_datetime_format, - origin=origin) - datetime_series = _map_partitions(datetime_helper, arg._row_partitions, - arg.columns) - result = pandas.concat(ray.get(datetime_series), copy=False) - result.index = arg.index + # Pandas seems to ignore this kwarg so we will too + #if errors == 'raise': + pandas.to_datetime( + pandas.DataFrame(columns=arg.columns), + errors=errors, + dayfirst=dayfirst, + yearfirst=yearfirst, + utc=utc, + box=box, + format=format, + exact=exact, + unit=unit, + infer_datetime_format=infer_datetime_format, + origin=origin) - return result + return arg._data_manager.to_datetime() diff --git a/modin/pandas/iterator.py b/modin/pandas/iterator.py index 78a2af5e6b1..3743d605e05 100644 --- a/modin/pandas/iterator.py +++ b/modin/pandas/iterator.py @@ -6,17 +6,19 @@ class PartitionIterator(Iterator): - def __init__(self, partitions, func): + def __init__(self, data_manager, axis, func): """PartitionIterator class to define a generator on partitioned data Args: - partitions ([ObjectID]): Partitions to iterate over + data_manager (DataManager): Data manager for the dataframe + axis (int): axis to iterate over func (callable): The function to get inner iterables from each partition """ - self.partitions = iter(partitions) + self.data_manager = data_manager + self.axis = axis + self.index_iter = iter(self.data_manager.columns) if axis else iter(self.data_manager.index) self.func = func - self.iter_cache = iter([]) def __iter__(self): return self @@ -25,9 +27,11 @@ def __next__(self): return self.next() def next(self): - try: - return next(self.iter_cache) - except StopIteration: - next_partition = next(self.partitions) - self.iter_cache = self.func(next_partition) - return self.next() + if self.axis: + key = next(self.index_iter) + df = self.data_manager.getitem_column_array([key]).to_pandas() + else: + key = next(self.index_iter) + df = self.data_manager.getitem_row_array([key]).to_pandas() + return next(self.func(df)) +