Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 209 additions & 85 deletions modin/data_management/data_manager.py

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions modin/data_management/partitioning/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
## Implementation Note

### Object Hierarchy

- `remote_partition.py` contains `RemotePartition` interface and its implementations.
- `partition_collections.py` contains `BlockPartitions` interface and its implementations.
- `BlockPartitions` manages 2D-array of `RemotePartition` object
- `axis_partition.py` contains `AxisPartition` and with the following hierarchy:
```
AxisPartition -> RayAxisPartition -> {RayColumnPartition, RayRowPartition}
```
- `AxisPartition` is a high level view onto BlockPartitions' data. It is more
convient to operate on `AxisPartition` sometimes.
94 changes: 93 additions & 1 deletion modin/data_management/partitioning/partition_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
from __future__ import division
from __future__ import print_function

from typing import Tuple

import numpy as np
import ray
import pandas

from .remote_partition import RayRemotePartition
from .axis_partition import RayColumnPartition, RayRowPartition
from .utils import compute_chunksize
from .utils import compute_chunksize, _get_nan_block_id


class BlockPartitions(object):
Expand Down Expand Up @@ -120,6 +122,10 @@ def block_widths(self):
self._widths_cache = [obj.width().get() for obj in self.partitions[0]]
return self._widths_cache

@property
def shape(self) -> Tuple[int, int]:
return int(np.sum(self.block_lengths)), int(np.sum(self.block_widths))

def full_reduce(self, map_func, reduce_func, axis):
"""Perform a full reduce on the data.

Expand Down Expand Up @@ -173,6 +179,15 @@ def map_across_blocks(self, map_func):
new_partitions = np.array([[part.apply(preprocessed_map_func) for part in row_of_parts] for row_of_parts in self.partitions])
return cls(new_partitions)

def lazy_map_across_blocks(self, map_func, kwargs):
cls = type(self)
preprocessed_map_func = self.preprocess_func(map_func)
new_partitions = np.array(
[[part.add_to_apply_calls(preprocessed_map_func, kwargs) for part in row_of_parts]
for row_of_parts in self.partitions])
return cls(new_partitions)


def map_across_full_axis(self, axis, map_func):
"""Applies `map_func` to every partition.

Expand Down Expand Up @@ -633,6 +648,65 @@ def apply_func_to_select_indices_along_full_axis(self, axis, func, indices, keep

return cls(result.T) if not axis else cls(result)


def apply_func_to_indices_both_axis(self, func, row_indices, col_indices,
lazy=False, keep_remaining=True, mutate=False,
item_to_distribute=None):
"""
Apply a function to along both axis

Important: For your func to operate directly on the indices provided,
it must use `row_internal_indices, col_internal_indices` as keyword arguments.
"""
cls = type(self)

if not mutate:
partition_copy = self.partitions.copy()
else:
partition_copy = self.partitions

operation_mask = np.full(self.partitions.shape, False)

row_position_counter = 0
for row_blk_idx, row_internal_idx in self._get_dict_of_block_index(1, row_indices).items():
col_position_counter = 0
for col_blk_idx, col_internal_idx in self._get_dict_of_block_index(0, col_indices).items():
remote_part = partition_copy[row_blk_idx, col_blk_idx]

if item_to_distribute is not None:
item = item_to_distribute[
row_position_counter:row_position_counter+len(row_internal_idx),
col_position_counter:col_position_counter+len(col_internal_idx)
]
item = {'item': item}
else:
item = dict()

if lazy:
result = remote_part.add_to_apply_calls(func,
row_internal_indices=row_internal_idx,
col_internal_indices=col_internal_idx,
**item)
else:
result = remote_part.apply(func,
row_internal_indices=row_internal_idx,
col_internal_indices=col_internal_idx,
**item)

partition_copy[row_blk_idx, col_blk_idx] = result
operation_mask[row_blk_idx, col_blk_idx] = True

col_position_counter += len(col_internal_idx)
row_position_counter += len(row_internal_idx)

column_idx = np.where(np.any(operation_mask, axis=0))[0]
row_idx = np.where(np.any(operation_mask, axis=1))[0]
if not keep_remaining:
partition_copy = partition_copy[row_idx][:, column_idx]

return cls(partition_copy)


def inter_data_operation(self, axis, func, other):
"""Apply a function that requires two BlockPartitions objects.

Expand Down Expand Up @@ -686,6 +760,24 @@ def __getitem__(self, key):
def __len__(self):
return sum(self.block_lengths)

def enlarge_partitions(self, n_rows=None, n_cols=None):
data = self.partitions
block_partitions_cls = type(self)

if n_rows:
n_cols_lst = self.block_widths
nan_oids_lst = [self._partition_class(_get_nan_block_id(self._partition_class, n_rows, n_cols_)) for n_cols_ in n_cols_lst]
new_chunk = block_partitions_cls(np.array([nan_oids_lst]))
data = self.concat(axis=0, other_blocks=new_chunk)

if n_cols:
n_rows_lst = self.block_lengths
nan_oids_lst = [self._partition_class(_get_nan_block_id(self._partition_class, n_rows_, n_cols)) for n_rows_ in n_rows_lst]
new_chunk = block_partitions_cls(np.array([nan_oids_lst]).T)
data = self.concat(axis=1, other_blocks=new_chunk)

return data


class RayBlockPartitions(BlockPartitions):
"""This method implements the interface in `BlockPartitions`."""
Expand Down
50 changes: 48 additions & 2 deletions modin/data_management/partitioning/remote_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ def apply(self, func, **kwargs):
"""
raise NotImplementedError("Must be implemented in child class")

def add_to_apply_calls(self, func, **kwargs):
"""Add the function to the apply function call stack.

This function will be executed when apply is called. It will be executed
in the order inserted; apply's func operates the last and return
"""
raise NotImplementedError("Must be implemented in child class")

def to_pandas(self):
"""Convert the object stored in this partition to a Pandas DataFrame.

Expand Down Expand Up @@ -128,20 +136,28 @@ def width(self):
self._width_cache = self.apply(preprocessed_func)
return self._width_cache

@classmethod
def empty(cls):
raise NotImplementedError("To be implemented in the child class!")


class RayRemotePartition(RemotePartition):

def __init__(self, object_id):
assert type(object_id) is ray.ObjectID

self.oid = object_id
self.call_queue = []

def get(self):
"""Gets the object out of the plasma store.

Returns:
The object from the plasma store.
"""
if len(self.call_queue):
return self.apply(lambda x: x).get()

return ray.get(self.oid)

def apply(self, func, **kwargs):
Expand All @@ -157,8 +173,34 @@ def apply(self, func, **kwargs):
Returns:
A RayRemotePartition object.
"""
new_oid = deploy_ray_func.remote(func, self.oid, kwargs)
return RayRemotePartition(new_oid)
oid = self.oid
self.call_queue.append((func, kwargs))

def call_queue_closure(oid_obj, call_queues):

for func, kwargs in call_queues:
if isinstance(func, ray.ObjectID):
func = ray.get(func)
if isinstance(kwargs, ray.ObjectID):
kwargs = ray.get(kwargs)

oid_obj = func(oid_obj, **kwargs)

return oid_obj

oid = deploy_ray_func.remote(call_queue_closure, oid, kwargs={'call_queues': self.call_queue})
self.call_queue = []

return RayRemotePartition(oid)


def add_to_apply_calls(self, func, **kwargs):
self.call_queue.append((func, kwargs))
return self


def __copy__(self):
return RayRemotePartition(object_id=self.oid)

def to_pandas(self):
"""Convert the object stored in this partition to a Pandas DataFrame.
Expand Down Expand Up @@ -203,6 +245,10 @@ def length_extraction_fn(cls):
def width_extraction_fn(cls):
return width_fn_pandas

@classmethod
def empty(cls):
return cls.put(pandas.DataFrame())


def length_fn_pandas(df):
assert isinstance(df, (pandas.DataFrame, pandas.Series))
Expand Down
26 changes: 26 additions & 0 deletions modin/data_management/partitioning/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,34 @@
from __future__ import division
from __future__ import print_function

import numpy as np
import pandas


def compute_chunksize(length, num_splits):
# We do this to avoid zeros and having an extremely large last partition
return length // num_splits if length % num_splits == 0 \
else length // num_splits + 1


def _get_nan_block_id(partition_class, n_row=1, n_col=1, transpose=False):
"""A memory efficient way to get a block of NaNs.

Args:
partition_class (RemotePartition): The class to use to put the object
in the remote format.
n_row(int): The number of rows.
n_col(int): The number of columns.
transpose(bool): If true, swap rows and columns.
Returns:
ObjectID of the NaN block.
"""
global _NAN_BLOCKS
if transpose:
n_row, n_col = n_col, n_row
shape = (n_row, n_col)
if shape not in _NAN_BLOCKS:
arr = np.tile(np.array(np.NaN), shape)
# TODO Not use pandas.DataFrame here, but something more general.
_NAN_BLOCKS[shape] = partition_class.put(pandas.DataFrame(data=arr))
return _NAN_BLOCKS[shape]
29 changes: 8 additions & 21 deletions modin/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
import ray

from .. import __git_revision__, __version__
from .concat import concat
from .dataframe import DataFrame
from .datetimes import to_datetime
from .io import (
read_csv, read_parquet, read_json, read_html, read_clipboard, read_excel,
read_hdf, read_feather, read_msgpack, read_stata, read_sas, read_pickle,
read_sql)
from .reshape import get_dummies

try:
if threading.current_thread().name == "MainThread":
Expand All @@ -30,27 +38,6 @@
num_cpus = ray.global_state.cluster_resources()['CPU']
DEFAULT_NPARTITIONS = int(num_cpus)


def set_npartition_default(n):
global DEFAULT_NPARTITIONS
DEFAULT_NPARTITIONS = n


def get_npartitions():
return DEFAULT_NPARTITIONS


# We import these file after above two function
# because they depend on npartitions.
from .concat import concat # noqa: 402
from .dataframe import DataFrame # noqa: 402
from .datetimes import to_datetime # noqa: 402
from .io import ( # noqa: 402
read_csv, read_parquet, read_json, read_html, read_clipboard, read_excel,
read_hdf, read_feather, read_msgpack, read_stata, read_sas, read_pickle,
read_sql)
from .reshape import get_dummies # noqa: 402

__all__ = [
"DataFrame", "Series", "read_csv", "read_parquet", "read_json",
"read_html", "read_clipboard", "read_excel", "read_hdf", "read_feather",
Expand Down
8 changes: 4 additions & 4 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4396,8 +4396,8 @@ def loc(self):
We currently support: single label, list array, slice object
We do not support: boolean array, callable
"""
from .indexing import _Loc_Indexer
return _Loc_Indexer(self)
from .indexing import _LocIndexer
return _LocIndexer(self)

@property
def is_copy(self):
Expand All @@ -4422,8 +4422,8 @@ def iloc(self):
We currently support: single label, list array, slice object
We do not support: boolean array, callable
"""
from .indexing import _iLoc_Indexer
return _iLoc_Indexer(self)
from .indexing import _iLocIndexer
return _iLocIndexer(self)

def _create_dataframe_from_manager(self, new_manager, inplace=False):
"""Returns or updates a DataFrame given new data_manager"""
Expand Down
4 changes: 0 additions & 4 deletions modin/pandas/index_metadata.py

This file was deleted.

Loading