diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index df7da328fee1..3c8cb94f5b11 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -35,10 +35,12 @@ _blocks_to_col, _blocks_to_row, _create_block_partitions, + _mask_block_partitions, _inherit_docstrings, _reindex_helper) from . import get_npartitions from .index_metadata import _IndexMetadata +from .indexing import _Loc_Indexer, _iLoc_Indexer @_inherit_docstrings(pd.DataFrame) @@ -46,7 +48,8 @@ class DataFrame(object): def __init__(self, data=None, index=None, columns=None, dtype=None, copy=False, col_partitions=None, row_partitions=None, - block_partitions=None, row_metadata=None, col_metadata=None): + block_partitions=None, row_metadata=None, col_metadata=None, + partial=False): """Distributed DataFrame object backed by Pandas dataframes. Args: @@ -69,7 +72,11 @@ def __init__(self, data=None, index=None, columns=None, dtype=None, Metadata for the new dataframe's rows col_metadata (_IndexMetadata): Metadata for the new dataframe's columns + partial (boolean): + Internal: row_metadata and col_metadata only covers part of the + block partitions. (Used in index 'vew' accessor) """ + self.partial = partial self._row_metadata = self._col_metadata = None # Check type of data and use appropriate constructor @@ -93,6 +100,7 @@ def __init__(self, data=None, index=None, columns=None, dtype=None, axis = 0 columns = pd_df.columns index = pd_df.index + self._row_metadata = self._col_metadata = None else: # created this invariant to make sure we never have to go into the # partitions to get the columns @@ -103,25 +111,28 @@ def __init__(self, data=None, index=None, columns=None, dtype=None, if block_partitions is not None: # put in numpy array here to make accesses easier since it's 2D self._block_partitions = np.array(block_partitions) + if row_metadata is not None: + self._row_metadata = row_metadata.copy() + if col_metadata is not None: + self._col_metadata = col_metadata.copy() assert self._block_partitions.ndim == 2, \ "Block Partitions must be 2D." else: if row_partitions is not None: axis = 0 partitions = row_partitions + if row_metadata is not None: + self._row_metadata = row_metadata.copy() elif col_partitions is not None: axis = 1 partitions = col_partitions + if col_metadata is not None: + self._col_metadata = col_metadata.copy() self._block_partitions = \ _create_block_partitions(partitions, axis=axis, length=len(columns)) - if row_metadata is not None: - self._row_metadata = row_metadata.copy() - if col_metadata is not None: - self._col_metadata = col_metadata.copy() - # Sometimes we only get a single column or row, which is # problematic for building blocks from the partitions, so we # add whatever dimension we're missing from the input. @@ -138,6 +149,21 @@ def __init__(self, data=None, index=None, columns=None, dtype=None, self._col_metadata = _IndexMetadata(self._block_partitions[0, :], index=columns, axis=1) + def _get_block_partitions(self): + if (self._row_metadata is None or self._col_metadata is None)\ + or (not self.partial): + return self._block_partitions_data + else: # is partial, need mask + oid_arr = _mask_block_partitions(self._block_partitions_data, + self._row_metadata, + self._col_metadata) + return oid_arr + + def _set_block_partitions(self, new_block_partitions): + self._block_partitions_data = new_block_partitions + + _block_partitions = property(_get_block_partitions, _set_block_partitions) + def _get_row_partitions(self): return [_blocks_to_row.remote(*part) for part in self._block_partitions] @@ -408,6 +434,10 @@ def dtypes(self): Returns: The dtypes for this DataFrame. """ + # Deal with empty column case + if self.shape[1] == 0: + return pd.Series() + # The dtypes are common across all partitions. # The first partition will be enough. result = ray.get(_deploy_func.remote(lambda df: df.dtypes, @@ -3775,9 +3805,7 @@ def loc(self): We currently support: single label, list array, slice object We do not support: boolean array, callable """ - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return _Loc_Indexer(self) @property def is_copy(self): @@ -3812,6 +3840,4 @@ def iloc(self): We currently support: single label, list array, slice object We do not support: boolean array, callable """ - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + return _iLoc_Indexer(self) diff --git a/python/ray/dataframe/index_metadata.py b/python/ray/dataframe/index_metadata.py index 235809ec7a35..a43132f4e4a4 100644 --- a/python/ray/dataframe/index_metadata.py +++ b/python/ray/dataframe/index_metadata.py @@ -2,6 +2,8 @@ import numpy as np import ray +import copy + from .utils import ( _build_index, _build_columns) @@ -122,6 +124,12 @@ def __init__(self, dfs=None, index=None, axis=0, lengths_oid=None, dfs ([ObjectID]): ObjectIDs of dataframe partitions index (pd.Index): Index of the Ray DataFrame. axis: Axis of partition (0=row partitions, 1=column partitions) + lengths_oid (Union[ObjectId, [ObjectId]): + Internal, used for constructing IndexMetadata without building + lengths from scratch. + coord_df_oid (Union[ObjectId, [ObjectId]): + Internal, used for constructing IndexMetadata without building + coord_df from scratch. Returns: A IndexMetadata backed by the specified pd.Index, partitioned off @@ -134,6 +142,9 @@ def __init__(self, dfs=None, index=None, axis=0, lengths_oid=None, self._coord_df = coord_df_oid self._lengths = lengths_oid + def copy(self): + return copy.copy(self) + def _get__lengths(self): if isinstance(self._lengths_cache, ray.local_scheduler.ObjectID) or \ (isinstance(self._lengths_cache, list) and @@ -271,14 +282,23 @@ def squeeze(self, partition, index_within_partition): self._coord_df.loc[partition_mask & index_within_partition_mask, 'index_within_partition'] -= 1 - def copy(self): - return _IndexMetadata(coord_df_oid=self._coord_df, - lengths_oid=self._lengths) + def get_partition(self, partition_id): + """Return a view of coord_df where partition = partition_id + """ + return self._coord_df[self._coord_df.partition == partition_id] + + def sorted_index(self): + return (self._coord_df + .sort_values(['partition', 'index_within_partition']) + .index) class _WrappingIndexMetadata(_IndexMetadata): """IndexMetadata implementation for index across a non-partitioned axis. This implementation assumes the underlying index lies across one partition. + + Deprecated: + This class was used to wrap column info when we have row partition. """ def __init__(self, index): diff --git a/python/ray/dataframe/indexing.py b/python/ray/dataframe/indexing.py index cba4ff8728fc..a08f77ddb206 100644 --- a/python/ray/dataframe/indexing.py +++ b/python/ray/dataframe/indexing.py @@ -1,104 +1,431 @@ +""" Indexing class layout as follows: + _Location_Indexer_Base is the parent class for Loc and iLoc Indexer + Base class is responsible for triage. + Child class is responsible for item lookup and write. +""" import pandas as pd +import numpy as np import ray -from .dataframe import _deploy_func +from warnings import warn + +from pandas.api.types import (is_scalar, is_list_like, is_bool) +from pandas.core.dtypes.common import is_integer + +from .utils import (_blocks_to_col, _get_nan_block_id, extractor) +from .index_metadata import _IndexMetadata + + +def is_slice(x): + return isinstance(x, slice) + + +def is_2d(x): + return is_list_like(x) or is_slice(x) + + +def is_tuple(x): + return isinstance(x, tuple) class _Location_Indexer_Base(): """Base class for location indexer like loc and iloc - This class abstract away commonly used method """ def __init__(self, ray_df): self.df = ray_df + self.col_coord_df = ray_df._col_metadata._coord_df + self.row_coord_df = ray_df._row_metadata._coord_df + self.block_oids = ray_df._block_partitions def __getitem__(self, key): + # Pathological Case: df.loc[1:,] + if is_tuple(key) and len(key) == 1: + key = key[0] + + # The one argument case is equivalent to full slice in 2nd dim. + # iloc[3] == iloc[3, :] for a dataframe if not isinstance(key, tuple): - # The one argument case is equivalent to full slice in 2nd dim. - return self.locate_2d(key, slice(None)) - else: - return self.locate_2d(*key) - - def _get_lookup_dict(self, ray_partition_idx): - if ray_partition_idx.ndim == 1: # Single row matched - position = (ray_partition_idx['partition'], - ray_partition_idx['index_within_partition']) - rows_to_lookup = {position[0]: [position[1]]} - if ray_partition_idx.ndim == 2: # Multiple rows matched - # We copy ray_partition_idx because it allows us to - # do groupby. This might not be the most efficient method. - # And have room to optimize. - ray_partition_idx = ray_partition_idx.copy() - rows_to_lookup = ray_partition_idx.groupby('partition').aggregate( - lambda x: list(x)).to_dict()['index_within_partition'] - return rows_to_lookup - - def locate_2d(self, row_label, col_label): + return self._triage_getitem(key, slice(None)) + + return self._triage_getitem(*key) + + def _triage_getitem(self, row_loc, col_loc): + # iloc[1,2] returns a scaler + if is_scalar(row_loc) and is_scalar(col_loc): + return self._get_scaler(row_loc, col_loc) + + # iloc[1,:] returns a row series: row 1 + if is_scalar(row_loc) and is_2d(col_loc): + return self._get_row_series(row_loc, col_loc) + + # iloc[:,1] returns a columns series: col 1 + if is_2d(row_loc) and is_scalar(col_loc): + return self._get_col_series(row_loc, col_loc) + + if is_2d(row_loc) and is_2d(col_loc): + return self._get_dataframe_view(row_loc, col_loc) + + def _get_scaler(self, row_loc, col_loc): pass - def _map_partition(self, lookup_dict, col_lst, indexer='loc'): - """Apply retrieval function to a lookup_dict - in the form of {partition_id: [idx]}. + def _get_col_series(self, row_loc, col_loc): + pass - Returns: - retrieved_rows_remote: a list of object ids for pd_df - """ - assert indexer in ['loc', 'iloc'], "indexer must be loc or iloc" + def _get_row_series(self, row_loc, col_loc): + pass + + def _get_dataframe_view(self, row_loc, col_loc): + pass - if indexer == 'loc': - def retrieve_func(df, idx_lst, col_label): - return df.loc[idx_lst, col_label] - elif indexer == 'iloc': +def is_enlargement(locator, corrd_df): + # Enlargement happens when you trying to locate using labels isn't in the + # original index. In other words, enlargement == adding NaNs ! + if is_list_like(locator) and not is_slice( + locator) and len(locator) > 0 and not is_bool(locator[0]): + n_diff_elems = len(pd.Index(locator).difference(corrd_df.index)) + is_enlargement_boolean = n_diff_elems > 0 + if is_enlargement_boolean: + warn( + FutureWarning(""" +Passing list-likes to .loc or [] with any missing label will raise +KeyError in the future, you can use .reindex() as an alternative. - def retrieve_func(df, idx_lst, col_idx): - return df.iloc[idx_lst, col_idx] +See the documentation here: +http://pandas.pydata.org/pandas-docs/stable/indexing.html#deprecate-loc-reindex-listlike + """)) - retrieved_rows_remote = [ - _deploy_func.remote(retrieve_func, - self.df._row_partitions[partition], - idx_to_lookup, col_lst) - for partition, idx_to_lookup in lookup_dict.items() - ] - return retrieved_rows_remote + return is_enlargement_boolean + return False class _Loc_Indexer(_Location_Indexer_Base): """A indexer for ray_df.loc[] functionality""" - def locate_2d(self, row_label, col_label): - index_loc = self.df._row_index.loc[row_label] - lookup_dict = self._get_lookup_dict(index_loc) - retrieved_rows_remote = self._map_partition( - lookup_dict, col_label, indexer='loc') - joined_df = pd.concat(ray.get(retrieved_rows_remote)) + def _get_scaler(self, row_loc, col_loc): + row_loc_result = self.row_coord_df.loc[row_loc] + if row_loc_result.ndim == 2: + # We can facing duplicate index values + return self._get_col_series(row_loc, col_loc) + row_part, row_idx = row_loc_result + col_part, col_idx = self.col_coord_df.loc[col_loc] + chunk_oid = self.block_oids[row_part, col_part] + result_oid = extractor.remote(chunk_oid, row_idx, col_idx) + return ray.get(result_oid) - if index_loc.ndim == 2: - # The returned result need to be indexed series/df - # Re-index is needed. - joined_df.index = index_loc.index + # Series Helper + def _get_series_blocks(self, row_loc, col_loc, primary_axis='row'): + # primary_axis := axis where the locator is a scaler + assert primary_axis in ['row', 'col'] - if isinstance(row_label, int) or isinstance(row_label, str): - return joined_df.squeeze(axis=0) + if primary_axis == 'row': + row_loc = [row_loc] else: - return joined_df + col_loc = [col_loc] + + # Have to do copy before we do groupby + col_part_table = self.col_coord_df.loc[col_loc].copy() + row_part_table = self.row_coord_df.loc[row_loc].copy() + + result_oids = [] + for row_part, row_partition_data in row_part_table.groupby( + 'partition'): + for col_part, col_partition_data in col_part_table.groupby( + 'partition'): + block_oid = self.block_oids[row_part, col_part] + row_idx = row_partition_data['index_within_partition'] + col_idx = col_partition_data['index_within_partition'] + + if primary_axis == 'row': + row_idx = row_idx.iloc[0] + else: + col_idx = col_idx.iloc[0] + + result_oid = extractor.remote(block_oid, row_idx, col_idx) + result_oids.append(result_oid) + + if primary_axis == 'row': + series_index = col_part_table.index + series_name = row_loc[0] + else: + series_index = row_part_table.index + series_name = col_loc[0] + + return result_oids, series_index, series_name + + def _post_process_series(self, + result_oids, + index, + name, + primary_axis='row'): + series = ray.get(_blocks_to_col.remote(*result_oids)) + series.index = index + series.name = name + return series + + def _get_col_series(self, row_loc, col_loc): + if is_enlargement(row_loc, self.row_coord_df): + return self._get_col_series_enlarge(row_loc, col_loc) + + result_oids, index, name = self._get_series_blocks( + row_loc, col_loc, primary_axis='col') + col_series = self._post_process_series( + result_oids, index, name, primary_axis='col') + col_series = col_series.astype(self.df.dtypes[col_loc]) + + return col_series + + def _compute_enlarge_labels(self, locator, axis='row'): + assert axis in ['row', 'col'] + + locator_as_index = pd.Index(locator) + if axis == 'row': + base_index = self.row_coord_df.index + else: + base_index = self.col_coord_df.index + + nan_labels = locator_as_index.difference(base_index) + common_labels = locator_as_index.intersection(base_index) + + if len(common_labels) == 0: + raise KeyError( + 'None of [{labels}] are in the [{base_index_name}]'.format( + labels=list(locator_as_index), + base_index_name=axis + ' index')) + + return nan_labels, common_labels + + def _get_col_series_enlarge(self, row_loc, col_loc): + nan_labels, common_labels = self._compute_enlarge_labels( + row_loc, axis='row') + nan_series = pd.Series({name: np.NaN for name in nan_labels}) + + col_series = self._get_col_series(common_labels, col_loc) + col_series = pd.concat([col_series, nan_series]) + col_series = col_series.reindex(row_loc) + + return col_series + + def _get_row_series(self, row_loc, col_loc): + if is_enlargement(col_loc, self.col_coord_df): + return self._get_row_series_enlarge(row_loc, col_loc) + + result_oids, index, name = self._get_series_blocks( + row_loc, col_loc, primary_axis='row') + row_series = self._post_process_series(result_oids, index, name) + + return row_series + + def _get_row_series_enlarge(self, row_loc, col_loc): + nan_labels, common_labels = self._compute_enlarge_labels( + col_loc, axis='col') + nan_series = pd.Series({name: np.NaN for name in nan_labels}) + + row_series = self._get_row_series(row_loc, common_labels) + row_series = pd.concat([row_series, nan_series]) + row_series = row_series.reindex(col_loc) + + return row_series + + def _enlarge_axis(self, locator, axis, row_meta, col_meta): + """Add rows/columns to block partitions according to locator. + + Returns: + metadata (_IndexMetadata) + """ + assert axis in ['row', 'col'] + nan_labels, _ = self._compute_enlarge_labels(locator, axis=axis) + n_nan_labels = len(nan_labels) + blk_part_n_row, blk_part_n_col = self.block_oids.shape + + if axis == 'row': + nan_blk_lens = col_meta._lengths + nan_blks = np.array([[ + _get_nan_block_id(n_nan_labels, n_cols) + for n_cols in nan_blk_lens + ]]) + else: + nan_blk_lens = row_meta._lengths + nan_blks = np.array([[ + _get_nan_block_id(n_rows, n_nan_labels) + for n_rows in nan_blk_lens + ]]).T + + self.block_oids = np.concatenate( + [self.block_oids, nan_blks], axis=0 if axis == 'row' else 1) + + nan_coord_df = pd.DataFrame(data=[{ + '': name, + 'partition': blk_part_n_row if axis == 'row' else blk_part_n_col, + 'index_within_partition': i + } for name, i in zip(nan_labels, np.arange(n_nan_labels))]).set_index( + '') + + coord_df = pd.concat([ + self.row_coord_df + if axis == 'row' else self.col_coord_df, nan_coord_df + ]) + coord_df = coord_df.loc[locator] + + lens = row_meta._lengths if axis == 'row' else col_meta._lengths + lens = np.concatenate([lens, np.array([n_nan_labels])]) + + row_metadata_view = _IndexMetadata( + coord_df_oid=coord_df, lengths_oid=lens) + + return row_metadata_view + + def _get_dataframe_view(self, row_loc, col_loc): + from .dataframe import DataFrame + + if is_enlargement(row_loc, self.row_coord_df): + row_metadata_view = self._enlarge_axis( + row_loc, + axis='row', + row_meta=self.df._row_metadata, + col_meta=self.df._col_metadata) + else: + row_coord_df = self.row_coord_df.loc[row_loc] + row_metadata_view = _IndexMetadata( + coord_df_oid=row_coord_df, + lengths_oid=self.df._row_metadata._lengths) + + if is_enlargement(col_loc, self.col_coord_df): + col_metadata_view = self._enlarge_axis( + col_loc, + axis='col', + row_meta=row_metadata_view, + col_meta=self.df._col_metadata) + else: + col_coord_df = self.col_coord_df.loc[col_loc] + col_metadata_view = _IndexMetadata( + coord_df_oid=col_coord_df, + lengths_oid=self.df._col_metadata._lengths) + + df_view = DataFrame( + block_partitions=self.block_oids, + row_metadata=row_metadata_view, + col_metadata=col_metadata_view, + index=row_metadata_view.index, + columns=col_metadata_view.index, + partial=True) + + return df_view class _iLoc_Indexer(_Location_Indexer_Base): """A indexer for ray_df.iloc[] functionality""" - def locate_2d(self, row_idx, col_idx): - index_loc = self.df._row_index.iloc[row_idx] - lookup_dict = self._get_lookup_dict(index_loc) - retrieved_rows_remote = self._map_partition( - lookup_dict, col_idx, indexer='iloc') - joined_df = pd.concat(ray.get(retrieved_rows_remote)) + # Note (simon) + # This is implemented by pruning and slightly modifying code from Loc + # Changes done: + # - Find and Replace loc with iloc + # - Make modificatino to series naming + # - Prune enlargement related functions - if index_loc.ndim == 2: - # The returned result need to be indexed series/df - # Re-index is needed. - joined_df.index = index_loc.index + def _get_scaler(self, row_loc, col_loc): + # Scaler access needs type checking. + if not (is_integer(row_loc) and is_integer(col_loc)): + raise ValueError(""" + Location based indexing can only have + [integer, integer slice (START point is INCLUDED, + END point is EXCLUDED), listlike of integers, + boolean array] types""") + row_loc_result = self.row_coord_df.iloc[row_loc] + if row_loc_result.ndim == 2: + # We can facing duplicate index values + return self._get_col_series(row_loc, col_loc) + row_part, row_idx = row_loc_result + col_part, col_idx = self.col_coord_df.iloc[col_loc] + chunk_oid = self.block_oids[row_part, col_part] + result_oid = extractor.remote(chunk_oid, row_idx, col_idx) + return ray.get(result_oid) - if isinstance(row_idx, int) or isinstance(row_idx, str): - return joined_df.squeeze(axis=0) + # Series Helper + def _get_series_blocks(self, row_loc, col_loc, primary_axis='row'): + # primary_axis := axis where the locator is a scaler + assert primary_axis in ['row', 'col'] + + if primary_axis == 'row': + row_loc = [row_loc] else: - return joined_df + col_loc = [col_loc] + + # Must copy before groupby. Otherwise buffer isn't writable + col_part_table = self.col_coord_df.iloc[col_loc].copy() + row_part_table = self.row_coord_df.iloc[row_loc].copy() + + result_oids = [] + for row_part, row_partition_data in row_part_table.groupby( + 'partition'): + for col_part, col_partition_data in col_part_table.groupby( + 'partition'): + block_oid = self.block_oids[row_part, col_part] + row_idx = row_partition_data['index_within_partition'] + col_idx = col_partition_data['index_within_partition'] + + if primary_axis == 'row': + row_idx = row_idx.iloc[0] + else: + col_idx = col_idx.iloc[0] + + result_oid = extractor.remote(block_oid, row_idx, col_idx) + result_oids.append(result_oid) + + if primary_axis == 'row': + series_index = col_part_table.index + series_name = row_part_table.index[0] + else: + series_index = row_part_table.index + series_name = col_part_table.index[0] + + return result_oids, series_index, series_name + + def _post_process_series(self, + result_oids, + index, + name, + primary_axis='row'): + series = ray.get(_blocks_to_col.remote(*result_oids)) + series.index = index + series.name = name + return series + + def _get_col_series(self, row_loc, col_loc): + result_oids, index, name = self._get_series_blocks( + row_loc, col_loc, primary_axis='col') + col_series = self._post_process_series( + result_oids, index, name, primary_axis='col') + col_series = col_series.astype(self.df.dtypes[col_loc]) + + return col_series + + def _get_row_series(self, row_loc, col_loc): + result_oids, index, name = self._get_series_blocks( + row_loc, col_loc, primary_axis='row') + row_series = self._post_process_series(result_oids, index, name) + + return row_series + + def _get_dataframe_view(self, row_loc, col_loc): + from .dataframe import DataFrame + + row_coord_df = self.row_coord_df.iloc[row_loc] + row_metadata_view = _IndexMetadata( + coord_df_oid=row_coord_df, + lengths_oid=self.df._row_metadata._lengths) + + col_coord_df = self.col_coord_df.iloc[col_loc] + col_metadata_view = _IndexMetadata( + coord_df_oid=col_coord_df, + lengths_oid=self.df._col_metadata._lengths) + + df_view = DataFrame( + block_partitions=self.block_oids, + row_metadata=row_metadata_view, + col_metadata=col_metadata_view, + index=row_metadata_view.index, + columns=col_metadata_view.index, + partial=True) + + return df_view diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index 3aee593bda05..d47a8de50ae6 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -8,6 +8,18 @@ from . import get_npartitions +__NAN_BLOCKS = dict() + + +def _get_nan_block_id(n_row=1, n_col=1): + global __NAN_BLOCKS + + shape = (n_row, n_col) + if shape not in __NAN_BLOCKS: + arr = np.tile(np.array(np.NaN), shape) + __NAN_BLOCKS[shape] = ray.put(pd.DataFrame(data=arr)) + return __NAN_BLOCKS[shape] + def _get_lengths(df): """Gets the length of the dataframe. @@ -72,9 +84,11 @@ def _partition_pandas_dataframe(df, num_partitions=None, row_chunksize=None): row_partitions.append(top) temp_df = temp_df[row_chunksize:] else: - if len(df) > row_chunksize: - temp_df.reset_index(drop=True, inplace=True) - temp_df.columns = pd.RangeIndex(0, len(temp_df.columns)) + # Handle the last chunk correctly. + # This call is necessary to prevent modifying original df + temp_df = temp_df[:] + temp_df.reset_index(drop=True, inplace=True) + temp_df.columns = pd.RangeIndex(0, len(temp_df.columns)) row_partitions.append(ray.put(temp_df)) return row_partitions @@ -107,14 +121,45 @@ def to_pandas(df): Returns: A new pandas DataFrame. """ - if df._row_partitions is not None: - pd_df = pd.concat(ray.get(df._row_partitions)) - else: - pd_df = pd.concat(ray.get(df._col_partitions), - axis=1) - pd_df.index = df.index - pd_df.columns = df.columns - return pd_df + if 0 in df.shape: # One axis is empty + return pd.DataFrame(index=df.index, columns=df.columns) + data = pd.concat(ray.get(df._col_partitions), axis=1, copy=False) + data.index = df.index + data.columns = df.columns + return data + + +@ray.remote +def extractor(df_chunk, row_loc, col_loc): + return df_chunk.iloc[row_loc, col_loc] + + +def _mask_block_partitions(blk_partitions, row_metadata, col_metadata): + """Return the squeezed/expanded block partitions as defined by + row_metadata and col_metadata. + + Note: + Very naive implementation. Extract one scaler at a time in a double + for loop. + """ + col_df = col_metadata._coord_df + row_df = row_metadata._coord_df + + result_oids = [] + shape = (len(row_df.index), len(col_df.index)) + + for _, row_partition_data in row_df.iterrows(): + for _, col_partition_data in col_df.iterrows(): + row_part = row_partition_data.partition + col_part = col_partition_data.partition + block_oid = blk_partitions[row_part, col_part] + + row_idx = row_partition_data['index_within_partition'] + col_idx = col_partition_data['index_within_partition'] + + result_oid = extractor.remote(block_oid, [row_idx], [col_idx]) + result_oids.append(result_oid) + return np.array(result_oids).reshape(shape) @ray.remote @@ -230,8 +275,11 @@ def create_blocks(df, npartitions, axis): @ray.remote def _blocks_to_col(*partition): - return pd.concat(partition, axis=0, copy=False)\ - .reset_index(drop=True) + if len(partition): + return pd.concat(partition, axis=0, copy=False)\ + .reset_index(drop=True) + else: + return pd.Series() @ray.remote