Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e7e78ce
Added type checking and changed how variables were read in from kwargs
williamma12 Aug 29, 2018
d686848
Merge branch 'rewrite_backend' into rewrite_backend
devin-petersohn Aug 30, 2018
cf9b05d
Updated sample to new architecture
williamma12 Aug 30, 2018
2b94c77
Fixed merge conflict
williamma12 Aug 30, 2018
8724956
Made test_sample more rigourous
williamma12 Aug 31, 2018
2ad1c3b
Removed 'default=' from kwargs.get's
williamma12 Aug 31, 2018
502bd0d
Updated eval to the new backend
williamma12 Aug 31, 2018
992a8e2
Added two more tests for eval
williamma12 Aug 31, 2018
7cbb17a
Updated memory_usage to new backend
williamma12 Sep 1, 2018
b144b3d
Updated info and memory_usage to the new backend
williamma12 Sep 2, 2018
4e7adda
Updated info and memory_usage to be standalone tests and updated the …
williamma12 Sep 2, 2018
8a69de5
Updated info to do only one pass
williamma12 Sep 2, 2018
0ea925f
Updated info to do everything in one run with DataFrame
williamma12 Sep 2, 2018
8a7b320
Update info to do everything in one run with Series
williamma12 Sep 2, 2018
8585f8f
Updated info to do everything in one run with DataFrame
williamma12 Sep 2, 2018
e273288
Updated to get everything working and moved appropriate parts to Data…
williamma12 Sep 2, 2018
9d0f224
Fixed merge conflics
williamma12 Sep 2, 2018
5d52f7f
Removed extraneous print statement
williamma12 Sep 6, 2018
7571b3d
Moved dtypes stuff to data manager
williamma12 Sep 6, 2018
e70aaec
Fixed calculating dtypes to only doing a full_reduce instead of map_f…
williamma12 Sep 6, 2018
2d3a0a5
Merge branch 'data_manager_dtypes' into rewrite_backend
williamma12 Sep 6, 2018
59d6c41
Updated astype to new backend
williamma12 Sep 7, 2018
f53917f
Updated astype to new backend
williamma12 Sep 7, 2018
e1b01fc
Updated ftypes to new backend
williamma12 Sep 7, 2018
3c28c8f
Added dtypes argument to map_partitions
williamma12 Sep 7, 2018
f6a8ed6
Merge branch 'rewrite_backend' into rewrite_backend
williamma12 Sep 7, 2018
c48c861
Updated astype and added dtypes option to _from_old_block_partitions …
williamma12 Sep 7, 2018
de9bc00
Fixed merge conflict
williamma12 Sep 7, 2018
a47f8be
Undid unnecessary change
williamma12 Sep 7, 2018
39d8330
Merge branch 'rewrite_backend' of https://github.com/devin-petersohn/…
williamma12 Sep 7, 2018
9c38231
Updated iterables to new backend
williamma12 Sep 8, 2018
c8b6301
Updated to_datetime to new backend
williamma12 Sep 10, 2018
a9af405
Reverted some changes for PR
williamma12 Sep 10, 2018
7e3c03d
Replaced pd with pandas
williamma12 Sep 10, 2018
ebea577
Made additional changes mentioned in (#7)
williamma12 Sep 10, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 32 additions & 21 deletions modin/data_management/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np
import pandas

from pandas.compat import string_types
from pandas.core.dtypes.cast import find_common_type
from pandas.core.dtypes.common import (_get_dtype_from_object, is_list_like)
Expand Down Expand Up @@ -758,6 +759,14 @@ def std(self, **kwargs):
func = self._prepare_method(pandas.DataFrame.std, **kwargs)
return self.full_axis_reduce(func, axis)

def to_datetime(self, **kwargs):
columns = self.columns
def to_datetime_builder(df, **kwargs):
df.columns = columns
return pandas.to_datetime(df, **kwargs)
func = self._prepare_method(to_datetime_builder, **kwargs)
return self.full_axis_reduce(func, 1)

def var(self, **kwargs):
# Pandas default is 0 (though not mentioned in docs)
axis = kwargs.get("axis", 0)
Expand Down Expand Up @@ -866,8 +875,8 @@ def cumprod(self, **kwargs):

def dropna(self, **kwargs):
axis = kwargs.get("axis", 0)
subset = kwargs.get("subset", None)
thresh = kwargs.get("thresh", None)
subset = kwargs.get("subset")
thresh = kwargs.get("thresh")
how = kwargs.get("how", "any")
# We need to subset the axis that we care about with `subset`. This
# will be used to determine the number of values that are NA.
Expand Down Expand Up @@ -930,7 +939,7 @@ def fillna(self, **kwargs):
cls = type(self)

axis = kwargs.get("axis", 0)
value = kwargs.pop("value", None)
value = kwargs.pop("value")

if isinstance(value, dict):
if axis == 0:
Expand All @@ -957,8 +966,9 @@ def describe(self, **kwargs):
new_data = self.map_across_full_axis(axis, func)
new_index = self.compute_index(0, new_data, False)
new_columns = self.compute_index(1, new_data, True)
new_dtypes = pandas.Series([np.float64 for _ in new_columns], index=new_columns)

return cls(new_data, new_index, new_columns)
return cls(new_data, new_index, new_columns, new_dtypes)

def rank(self, **kwargs):
cls = type(self)
Expand Down Expand Up @@ -1062,7 +1072,7 @@ def from_pandas(cls, df, block_partitions_cls):
df.columns = pandas.RangeIndex(len(df.columns))
new_data = block_partitions_cls.from_pandas(df)

return cls(new_data, new_index, new_columns, new_dtypes)
return cls(new_data, new_index, new_columns, dtypes=new_dtypes)

# __getitem__ methods
def getitem_single_key(self, key):
Expand Down Expand Up @@ -1144,7 +1154,6 @@ def delitem(df, internal_indices=[]):
# it throws an error.
new_columns = [self.columns[i] for i in range(len(self.columns)) if i not in numeric_indices]
new_dtypes = self.dtypes.drop(columns)

return cls(new_data, new_index, new_columns, new_dtypes)
# END __delitem__ and drop

Expand All @@ -1164,7 +1173,7 @@ def insert(df, internal_indices=[]):
new_data = self.data.apply_func_to_select_indices_along_full_axis(0, insert, loc, keep_remaining=True)
new_columns = self.columns.insert(loc, column)

# Because a Pandas Series does not allow insert, we make a DataFrame
# Because a Pandas Series does not allow insert, we make a DataFrame
# and insert the new dtype that way.
temp_dtypes = pandas.DataFrame(self.dtypes).T
temp_dtypes.insert(loc, column, _get_dtype_from_object(value))
Expand All @@ -1187,18 +1196,21 @@ def astype(self, col_dtypes, errors='raise', **kwargs):

for i, column in enumerate(columns):
dtype = col_dtypes[column]
if dtype in dtype_indices.keys():
dtype_indices[dtype].append(numeric_indices[i])
else:
dtype_indices[dtype] = [numeric_indices[i]]
new_dtype = np.dtype(dtype)
if dtype != np.int32 and new_dtype == np.int32:
new_dtype = np.dtype('int64')
elif dtype != np.float32 and new_dtype == np.float32:
new_dtype = np.dtype('float64')
new_dtypes[column] = new_dtype

if dtype != self.dtypes[column]:
if dtype in dtype_indices.keys():
dtype_indices[dtype].append(numeric_indices[i])
else:
dtype_indices[dtype] = [numeric_indices[i]]
new_dtype = np.dtype(dtype)
if dtype != np.int32 and new_dtype == np.int32:
new_dtype = np.dtype('int64')
elif dtype != np.float32 and new_dtype == np.float32:
new_dtype = np.dtype('float64')
new_dtypes[column] = new_dtype

new_data = self.data
for dtype in dtype_indices.keys():
resulting_dtype = None

def astype(df, internal_indices=[]):
block_dtypes = dict()
Expand All @@ -1208,8 +1220,8 @@ def astype(df, internal_indices=[]):

new_data = self.data.apply_func_to_select_indices(0, astype, dtype_indices[dtype], keep_remaining=True)

return cls(self.data, self.index, self.columns, new_dtypes)
# END astype
return cls(new_data, self.index, self.columns, new_dtypes)
# END type conversions

# UDF (apply and agg) methods
# There is a wide range of behaviors that are supported, so a lot of the
Expand Down Expand Up @@ -1302,7 +1314,6 @@ def callable_apply_builder(df, func, axis, index, *args, **kwargs):

func_prepared = self._prepare_method(lambda df: callable_apply_builder(df, func, axis, index, *args, **kwargs))
result_data = self.map_across_full_axis(axis, func_prepared)

return self._post_process_apply(result_data, axis)
# END UDF

Expand Down
33 changes: 12 additions & 21 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1912,17 +1912,14 @@ def iterrows(self):
Returns:
A generator that iterates over the rows of the frame.
"""
index_iter = (self._row_metadata.partition_series(i).index
for i in range(len(self._row_partitions)))
index_iter = iter(self.index)

def iterrow_helper(part):
df = ray.get(part)
def iterrow_builder(df):
df.columns = self.columns
df.index = next(index_iter)
df.index = [next(index_iter)]
return df.iterrows()

partition_iterator = PartitionIterator(self._row_partitions,
iterrow_helper)
partition_iterator = PartitionIterator(self._data_manager, 0, iterrow_builder)

for v in partition_iterator:
yield v
Expand All @@ -1938,17 +1935,14 @@ def items(self):
Returns:
A generator that iterates over the columns of the frame.
"""
col_iter = (self._col_metadata.partition_series(i).index
for i in range(len(self._col_partitions)))
col_iter = iter(self.columns)

def items_helper(part):
df = ray.get(part)
df.columns = next(col_iter)
def items_builder(df):
df.columns = [next(col_iter)]
df.index = self.index
return df.items()

partition_iterator = PartitionIterator(self._col_partitions,
items_helper)
partition_iterator = PartitionIterator(self._data_manager, 1, items_builder)

for v in partition_iterator:
yield v
Expand Down Expand Up @@ -1980,17 +1974,14 @@ def itertuples(self, index=True, name='Pandas'):
Returns:
A tuple representing row data. See args for varying tuples.
"""
index_iter = (self._row_metadata.partition_series(i).index
for i in range(len(self._row_partitions)))
index_iter = iter(self.index)

def itertuples_helper(part):
df = ray.get(part)
def itertuples_builder(df):
df.columns = self.columns
df.index = next(index_iter)
df.index = [next(index_iter)]
return df.itertuples(index=index, name=name)

partition_iterator = PartitionIterator(self._row_partitions,
itertuples_helper)
partition_iterator = PartitionIterator(self._data_manager, 0, itertuples_builder)

for v in partition_iterator:
yield v
Expand Down
48 changes: 15 additions & 33 deletions modin/pandas/datetimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,38 +56,20 @@ def to_datetime(arg,
unit=unit,
infer_datetime_format=infer_datetime_format,
origin=origin)
if errors == 'raise':
pandas.to_datetime(
pandas.DataFrame(columns=arg.columns),
errors=errors,
dayfirst=dayfirst,
yearfirst=yearfirst,
utc=utc,
box=box,
format=format,
exact=exact,
unit=unit,
infer_datetime_format=infer_datetime_format,
origin=origin)

def datetime_helper(df, cols):
df.columns = cols
return pandas.to_datetime(
df,
errors=errors,
dayfirst=dayfirst,
yearfirst=yearfirst,
utc=utc,
box=box,
format=format,
exact=exact,
unit=unit,
infer_datetime_format=infer_datetime_format,
origin=origin)

datetime_series = _map_partitions(datetime_helper, arg._row_partitions,
arg.columns)
result = pandas.concat(ray.get(datetime_series), copy=False)
result.index = arg.index
# Pandas seems to ignore this kwarg so we will too
#if errors == 'raise':
pandas.to_datetime(
pandas.DataFrame(columns=arg.columns),
errors=errors,
dayfirst=dayfirst,
yearfirst=yearfirst,
utc=utc,
box=box,
format=format,
exact=exact,
unit=unit,
infer_datetime_format=infer_datetime_format,
origin=origin)

return result
return arg._data_manager.to_datetime()
24 changes: 14 additions & 10 deletions modin/pandas/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@


class PartitionIterator(Iterator):
def __init__(self, partitions, func):
def __init__(self, data_manager, axis, func):
"""PartitionIterator class to define a generator on partitioned data

Args:
partitions ([ObjectID]): Partitions to iterate over
data_manager (DataManager): Data manager for the dataframe
axis (int): axis to iterate over
func (callable): The function to get inner iterables from
each partition
"""
self.partitions = iter(partitions)
self.data_manager = data_manager
self.axis = axis
self.index_iter = iter(self.data_manager.columns) if axis else iter(self.data_manager.index)
self.func = func
self.iter_cache = iter([])

def __iter__(self):
return self
Expand All @@ -25,9 +27,11 @@ def __next__(self):
return self.next()

def next(self):
try:
return next(self.iter_cache)
except StopIteration:
next_partition = next(self.partitions)
self.iter_cache = self.func(next_partition)
return self.next()
if self.axis:
key = next(self.index_iter)
df = self.data_manager.getitem_column_array([key]).to_pandas()
else:
key = next(self.index_iter)
df = self.data_manager.getitem_row_array([key]).to_pandas()
return next(self.func(df))