Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,21 @@
_blocks_to_col,
_blocks_to_row,
_create_block_partitions,
_mask_block_partitions,
_inherit_docstrings,
_reindex_helper)
from . import get_npartitions
from .index_metadata import _IndexMetadata
from .indexing import _Loc_Indexer, _iLoc_Indexer


@_inherit_docstrings(pd.DataFrame)
class DataFrame(object):

def __init__(self, data=None, index=None, columns=None, dtype=None,
copy=False, col_partitions=None, row_partitions=None,
block_partitions=None, row_metadata=None, col_metadata=None):
block_partitions=None, row_metadata=None, col_metadata=None,
partial=False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to have a DataFrameView object as a subclass of this one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I'm reading through this, I think it might make more sense to have it as a subclass. Changing the way that _block_partitions is handled in the case that it's a view makes complicated code more complicated.

Alternatively, if we decide otherwise, I would still like to see more comments about the _block_partitions changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should have it as a subclass. DataFrameView object make sense.

I'm refactoring parts of my indexing.py to make enlargement inside the parent class of the _LocIndexer and _iLocIndexer. So that we have cleaner code in indexing.py

"""Distributed DataFrame object backed by Pandas dataframes.

Args:
Expand All @@ -69,7 +72,11 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
Metadata for the new dataframe's rows
col_metadata (_IndexMetadata):
Metadata for the new dataframe's columns
partial (boolean):
Internal: row_metadata and col_metadata only covers part of the
block partitions. (Used in index 'vew' accessor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vew -> view.

"""
self.partial = partial
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partial -> _partial

self._row_metadata = self._col_metadata = None

# Check type of data and use appropriate constructor
Expand All @@ -93,6 +100,7 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
axis = 0
columns = pd_df.columns
index = pd_df.index
self._row_metadata = self._col_metadata = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated (Remove this in favor of Line 73)

else:
# created this invariant to make sure we never have to go into the
# partitions to get the columns
Expand All @@ -103,25 +111,28 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
if block_partitions is not None:
# put in numpy array here to make accesses easier since it's 2D
self._block_partitions = np.array(block_partitions)
if row_metadata is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this in two places?

self._row_metadata = row_metadata.copy()
if col_metadata is not None:
self._col_metadata = col_metadata.copy()
assert self._block_partitions.ndim == 2, \
"Block Partitions must be 2D."
else:
if row_partitions is not None:
axis = 0
partitions = row_partitions
if row_metadata is not None:
self._row_metadata = row_metadata.copy()
elif col_partitions is not None:
axis = 1
partitions = col_partitions
if col_metadata is not None:
self._col_metadata = col_metadata.copy()

self._block_partitions = \
_create_block_partitions(partitions, axis=axis,
length=len(columns))

if row_metadata is not None:
self._row_metadata = row_metadata.copy()
if col_metadata is not None:
self._col_metadata = col_metadata.copy()

# Sometimes we only get a single column or row, which is
# problematic for building blocks from the partitions, so we
# add whatever dimension we're missing from the input.
Expand All @@ -138,6 +149,21 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
self._col_metadata = _IndexMetadata(self._block_partitions[0, :],
index=columns, axis=1)

def _get_block_partitions(self):
if (self._row_metadata is None or self._col_metadata is None)\
or (not self.partial):
return self._block_partitions_data
else: # is partial, need mask
oid_arr = _mask_block_partitions(self._block_partitions_data,
self._row_metadata,
self._col_metadata)
return oid_arr

def _set_block_partitions(self, new_block_partitions):
self._block_partitions_data = new_block_partitions

_block_partitions = property(_get_block_partitions, _set_block_partitions)

def _get_row_partitions(self):
return [_blocks_to_row.remote(*part)
for part in self._block_partitions]
Expand Down Expand Up @@ -408,6 +434,10 @@ def dtypes(self):
Returns:
The dtypes for this DataFrame.
"""
# Deal with empty column case
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer comment to say Deal with a DataFrame with no columns or something along those lines. Empty columns feels a bit ambiguous (could mean all NaN in some cases).

if self.shape[1] == 0:
return pd.Series()

# The dtypes are common across all partitions.
# The first partition will be enough.
result = ray.get(_deploy_func.remote(lambda df: df.dtypes,
Expand Down Expand Up @@ -3775,9 +3805,7 @@ def loc(self):
We currently support: single label, list array, slice object
We do not support: boolean array, callable
"""
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return _Loc_Indexer(self)

@property
def is_copy(self):
Expand Down Expand Up @@ -3812,6 +3840,4 @@ def iloc(self):
We currently support: single label, list array, slice object
We do not support: boolean array, callable
"""
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return _iLoc_Indexer(self)
26 changes: 23 additions & 3 deletions python/ray/dataframe/index_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import numpy as np
import ray

import copy

from .utils import (
_build_index,
_build_columns)
Expand Down Expand Up @@ -122,6 +124,12 @@ def __init__(self, dfs=None, index=None, axis=0, lengths_oid=None,
dfs ([ObjectID]): ObjectIDs of dataframe partitions
index (pd.Index): Index of the Ray DataFrame.
axis: Axis of partition (0=row partitions, 1=column partitions)
lengths_oid (Union[ObjectId, [ObjectId]):
Internal, used for constructing IndexMetadata without building
lengths from scratch.
coord_df_oid (Union[ObjectId, [ObjectId]):
Internal, used for constructing IndexMetadata without building
coord_df from scratch.

Returns:
A IndexMetadata backed by the specified pd.Index, partitioned off
Expand All @@ -134,6 +142,9 @@ def __init__(self, dfs=None, index=None, axis=0, lengths_oid=None,
self._coord_df = coord_df_oid
self._lengths = lengths_oid

def copy(self):
return copy.copy(self)

def _get__lengths(self):
if isinstance(self._lengths_cache, ray.local_scheduler.ObjectID) or \
(isinstance(self._lengths_cache, list) and
Expand Down Expand Up @@ -271,14 +282,23 @@ def squeeze(self, partition, index_within_partition):
self._coord_df.loc[partition_mask & index_within_partition_mask,
'index_within_partition'] -= 1

def copy(self):
return _IndexMetadata(coord_df_oid=self._coord_df,
lengths_oid=self._lengths)
def get_partition(self, partition_id):
"""Return a view of coord_df where partition = partition_id
"""
return self._coord_df[self._coord_df.partition == partition_id]

def sorted_index(self):
return (self._coord_df
.sort_values(['partition', 'index_within_partition'])
.index)


class _WrappingIndexMetadata(_IndexMetadata):
"""IndexMetadata implementation for index across a non-partitioned axis.
This implementation assumes the underlying index lies across one partition.

Deprecated:
This class was used to wrap column info when we have row partition.
"""

def __init__(self, index):
Expand Down
Loading