Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
Modin
=====

.. image:: https://travis-ci.com/modin-project/modin.svg?branch=master
:target: https://travis-ci.com/modin-project/modin

.. image:: //readthedocs.org/projects/modin/badge/?version=latest
:target: https://modin.readthedocs.io/en/latest/?badge=latest
:alt: Documentation Status



Pandas on Ray
-------------

Expand Down
13 changes: 2 additions & 11 deletions modin/dataframe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from __future__ import division
from __future__ import print_function

import pandas
# TODO: In the future `set_option` or similar needs to run on every node
# in order to keep all pandas instances across nodes consistent
from pandas import (eval, unique, value_counts, cut, to_numeric, factorize,
Expand All @@ -12,14 +11,6 @@
set_option, NaT, PeriodIndex, Categorical)
import threading

pandas_version = pandas.__version__
pandas_major = int(pandas_version.split(".")[0])
pandas_minor = int(pandas_version.split(".")[1])

if pandas_major == 0 and pandas_minor != 22:
raise Exception("In order to use Pandas on Ray, your pandas version must "
"be 0.22. You can run 'pip install pandas==0.22'")

DEFAULT_NPARTITIONS = 8


Expand All @@ -34,13 +25,13 @@ def get_npartitions():

# We import these file after above two function
# because they depend on npartitions.
from .concat import concat # noqa: 402
from .dataframe import DataFrame # noqa: 402
from .datetimes import to_datetime # noqa: 402
from .io import (read_csv, read_parquet, read_json, read_html, # noqa: 402
read_clipboard, read_excel, read_hdf, read_feather, # noqa: 402
read_msgpack, read_stata, read_sas, read_pickle, # noqa: 402
read_sql) # noqa: 402
from .concat import concat # noqa: 402
from .datetimes import to_datetime # noqa: 402
from .reshape import get_dummies # noqa: 402

__all__ = [
Expand Down
2 changes: 2 additions & 0 deletions modin/dataframe/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from __future__ import print_function

import pandas

import numpy as np

from .dataframe import DataFrame
from .utils import _reindex_helper

Expand Down
55 changes: 27 additions & 28 deletions modin/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,48 @@
from __future__ import print_function

import pandas
import functools
from pandas.api.types import is_scalar
from pandas.util._validators import validate_bool_kwarg
from pandas.core.index import _ensure_index_from_sequences
from pandas._libs import lib
from pandas.core.dtypes.cast import maybe_upcast_putmask
from pandas import compat
from pandas.compat import lzip, to_str, string_types, cPickle as pkl
import pandas.core.common as com
from pandas.core.dtypes.cast import maybe_upcast_putmask
from pandas.core.dtypes.common import (
_get_dtype_from_object,
is_bool_dtype,
is_list_like,
is_numeric_dtype,
is_timedelta64_dtype,
_get_dtype_from_object)
is_timedelta64_dtype)
from pandas.core.index import _ensure_index_from_sequences
from pandas.core.indexing import check_bool_indexer
from pandas.errors import MergeError
from pandas.util._validators import validate_bool_kwarg
from pandas._libs import lib

import warnings
import itertools
import io
import functools
import numpy as np
from numpy.testing import assert_equal
import ray
import itertools
import io
import sys
import re
import sys
import warnings

from .utils import (
_deploy_func,
_map_partitions,
_partition_pandas_dataframe,
to_pandas,
create_blocks_helper,
_blocks_to_col,
_blocks_to_row,
_compile_remote_dtypes,
_concat_index,
_co_op_helper,
_create_block_partitions,
_create_blocks_helper,
_deploy_func,
_fix_blocks_dimensions,
_inherit_docstrings,
_reindex_helper,
_co_op_helper,
_map_partitions,
_match_partitioning,
_concat_index,
fix_blocks_dimensions,
_compile_remote_dtypes)
_partition_pandas_dataframe,
_reindex_helper)
from . import get_npartitions
from .index_metadata import _IndexMetadata
from .iterator import PartitionIterator
Expand Down Expand Up @@ -126,7 +125,7 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
# put in numpy array here to make accesses easier since it's 2D
self._block_partitions = np.array(block_partitions)
self._block_partitions = \
fix_blocks_dimensions(self._block_partitions, axis)
_fix_blocks_dimensions(self._block_partitions, axis)

else:
if row_partitions is not None:
Expand Down Expand Up @@ -658,7 +657,7 @@ def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True,
axis = pandas.DataFrame()._get_axis_number(axis)
if callable(by):
by = by(self.index)
elif isinstance(by, compat.string_types):
elif isinstance(by, string_types):
by = self.__getitem__(by).values.tolist()
elif is_list_like(by):
if isinstance(by, pandas.Series):
Expand Down Expand Up @@ -955,7 +954,7 @@ def _aggregate(self, arg, *args, **kwargs):
_axis = getattr(self, 'axis', 0)
kwargs.pop('_level', None)

if isinstance(arg, compat.string_types):
if isinstance(arg, string_types):
return self._string_function(arg, *args, **kwargs)

# Dictionaries have complex behavior because they can be renamed here.
Expand All @@ -972,7 +971,7 @@ def _aggregate(self, arg, *args, **kwargs):
raise ValueError("type {} is not callable".format(type(arg)))

def _string_function(self, func, *args, **kwargs):
assert isinstance(func, compat.string_types)
assert isinstance(func, string_types)

f = getattr(self, func, None)

Expand Down Expand Up @@ -1187,7 +1186,7 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None,
"""
axis = pandas.DataFrame()._get_axis_number(axis)

if isinstance(func, compat.string_types):
if isinstance(func, string_types):
if axis == 1:
kwds['axis'] = axis
return getattr(self, func)(*args, **kwds)
Expand Down Expand Up @@ -1231,7 +1230,7 @@ def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None,
self._col_partitions)

# resolve function names for the DataFrame index
new_index = [f_name if isinstance(f_name, compat.string_types)
new_index = [f_name if isinstance(f_name, string_types)
else f_name.__name__ for f_name in func]
return DataFrame(col_partitions=new_cols,
columns=self.columns,
Expand Down Expand Up @@ -5396,7 +5395,7 @@ def reindex_helper(old_index, new_index, axis, npartitions, method, fill_value,
df = df.reindex(new_index, copy=False, axis=axis ^ 1,
method=method, fill_value=fill_value,
limit=limit, tolerance=tolerance)
return create_blocks_helper(df, npartitions, axis)
return _create_blocks_helper(df, npartitions, axis)


@ray.remote
Expand Down
1 change: 1 addition & 0 deletions modin/dataframe/datetimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import print_function

import pandas

import ray

from .dataframe import DataFrame
Expand Down
2 changes: 1 addition & 1 deletion modin/dataframe/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
from __future__ import print_function

import pandas
import numpy as np
import pandas.core.groupby
from pandas.core.dtypes.common import is_list_like
import pandas.core.common as com

import numpy as np
import ray

from .utils import _inherit_docstrings, _reindex_helper
Expand Down
5 changes: 5 additions & 0 deletions modin/dataframe/index_metadata.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import pandas

import numpy as np
import ray

Expand Down
31 changes: 18 additions & 13 deletions modin/dataframe/indexing.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import pandas
from pandas.api.types import (is_scalar, is_list_like, is_bool)
from pandas.core.dtypes.common import is_integer
from pandas.core.indexing import IndexingError

import numpy as np
import ray
from warnings import warn

from .utils import (_blocks_to_col, _get_nan_block_id, extractor,
_mask_block_partitions, writer)
from .index_metadata import _IndexMetadata
from .dataframe import DataFrame

"""Indexing Helper Class works as follows:

_Location_Indexer_Base provide methods framework for __getitem__
Expand All @@ -13,19 +31,6 @@
An illustration is available at
https://github.com/ray-project/ray/pull/1955#issuecomment-386781826
"""
import pandas
import numpy as np
import ray
from warnings import warn

from pandas.api.types import (is_scalar, is_list_like, is_bool)
from pandas.core.dtypes.common import is_integer
from pandas.core.indexing import IndexingError

from .utils import (_blocks_to_col, _get_nan_block_id, extractor,
_mask_block_partitions, writer)
from .index_metadata import _IndexMetadata
from .dataframe import DataFrame


def is_slice(x): return isinstance(x, slice)
Expand Down
9 changes: 4 additions & 5 deletions modin/dataframe/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
from __future__ import division
from __future__ import print_function

import pandas
from pandas.io.common import _infer_compression

from itertools import chain
from io import BytesIO
import os
from pyarrow.parquet import ParquetFile
import re
import warnings
import pandas

from pyarrow.parquet import ParquetFile
from pandas.io.common import _infer_compression # don't depend on internal API


from .dataframe import ray, DataFrame
from . import get_npartitions
Expand Down
4 changes: 4 additions & 0 deletions modin/dataframe/iterator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from collections import Iterator


Expand Down
6 changes: 3 additions & 3 deletions modin/dataframe/reshape.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from __future__ import division
from __future__ import print_function

import ray
import pandas
import numpy as np

from pandas import compat
from pandas.core.dtypes.common import is_list_like

from itertools import cycle
import ray
import numpy as np

from .dataframe import DataFrame
from .utils import _deploy_func
Expand Down
3 changes: 2 additions & 1 deletion modin/dataframe/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from __future__ import division
from __future__ import print_function

import numpy as np
import pandas

import numpy as np

from .utils import _inherit_docstrings


Expand Down
15 changes: 8 additions & 7 deletions modin/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
from __future__ import division
from __future__ import print_function

import collections
import pandas

import collections
import numpy as np
import ray

Expand Down Expand Up @@ -371,15 +372,15 @@ def _create_block_partitions(partitions, axis=0, length=None):
# Sometimes we only get a single column or row, which is
# problematic for building blocks from the partitions, so we
# add whatever dimension we're missing from the input.
return fix_blocks_dimensions(blocks, axis)
return _fix_blocks_dimensions(blocks, axis)


@ray.remote
def create_blocks(df, npartitions, axis):
return create_blocks_helper(df, npartitions, axis)
return _create_blocks_helper(df, npartitions, axis)


def create_blocks_helper(df, npartitions, axis):
def _create_blocks_helper(df, npartitions, axis):
# Single partition dataframes don't need to be repartitioned
if npartitions == 1:
return df
Expand Down Expand Up @@ -479,7 +480,7 @@ def _reindex_helper(old_index, new_index, axis, npartitions, *df):
df.columns = old_index

df = df.reindex(new_index, copy=False, axis=axis ^ 1)
return create_blocks_helper(df, npartitions, axis)
return _create_blocks_helper(df, npartitions, axis)


@ray.remote
Expand Down Expand Up @@ -511,7 +512,7 @@ def _co_op_helper(func, left_columns, right_columns, left_df_len, left_idx,

new_rows = func(left, right)

new_blocks = create_blocks_helper(new_rows, left_df_len, 0)
new_blocks = _create_blocks_helper(new_rows, left_df_len, 0)

if left_idx is not None:
new_blocks.append(new_rows.index)
Expand Down Expand Up @@ -563,7 +564,7 @@ def _concat_index(*index_parts):
return index_parts[0].append(index_parts[1:])


def fix_blocks_dimensions(blocks, axis):
def _fix_blocks_dimensions(blocks, axis):
"""Checks that blocks is 2D, and adds a dimension if not.
"""
if blocks.ndim < 2:
Expand Down