Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 46 additions & 37 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
col_metadata (_IndexMetadata):
Metadata for the new dataframe's columns
"""
self._row_metadata = self._col_metadata = None

# Check type of data and use appropriate constructor
if data is not None or (col_partitions is None and
Expand All @@ -97,9 +96,9 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
else:
# created this invariant to make sure we never have to go into the
# partitions to get the columns
assert columns is not None, \
"Columns not defined, must define columns for internal " \
"DataFrame creations"
assert columns is not None or col_metadata is not None, \
"Columns not defined, must define columns or col_metadata " \
"for internal DataFrame creations"

if block_partitions is not None:
# put in numpy array here to make accesses easier since it's 2D
Expand All @@ -109,18 +108,18 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
if row_partitions is not None:
axis = 0
partitions = row_partitions
axis_length = len(columns) if columns is not None else \
len(col_metadata)
elif col_partitions is not None:
axis = 1
partitions = col_partitions
axis_length = None

# TODO: write explicit tests for "short and wide"
# column partitions
self._block_partitions = \
_create_block_partitions(partitions, axis=axis,
length=len(columns))

if row_metadata is not None:
self._row_metadata = row_metadata.copy()
if col_metadata is not None:
self._col_metadata = col_metadata.copy()
length=axis_length)

# Sometimes we only get a single column or row, which is
# problematic for building blocks from the partitions, so we
Expand All @@ -133,10 +132,19 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,

# Create the row and column index objects for using our partitioning.
# If the objects haven't been inherited, then generate them
if self._row_metadata is None:
if row_metadata is not None:
self._row_metadata = row_metadata.copy()
if index is not None:
self.index = index
else:
self._row_metadata = _IndexMetadata(self._block_partitions[:, 0],
index=index, axis=0)
if self._col_metadata is None:

if col_metadata is not None:
self._col_metadata = col_metadata.copy()
if columns is not None:
self.columns = columns
else:
self._col_metadata = _IndexMetadata(self._block_partitions[0, :],
index=columns, axis=1)

Expand Down Expand Up @@ -518,7 +526,8 @@ def add_prefix(self, prefix):
new_cols = self.columns.map(lambda x: str(prefix) + str(x))
return DataFrame(block_partitions=self._block_partitions,
columns=new_cols,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get rid of this line since we only need the metadata now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't in this case, as discussed today. If we don't pass the columns then the metadata of the new dataframe won't have the column changes reflected. Therefore, we need to either copy the metadata and modify the copy and push the copy, or pass the new columns such that the constructor modifies the metadata object copy on its end.

index=self.index)
col_metadata=self._col_metadata,
row_metadata=self._row_metadata)

def add_suffix(self, suffix):
"""Add a suffix to each of the column names.
Expand All @@ -529,7 +538,8 @@ def add_suffix(self, suffix):
new_cols = self.columns.map(lambda x: str(x) + str(suffix))
return DataFrame(block_partitions=self._block_partitions,
columns=new_cols,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, can we get rid of this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

index=self.index)
col_metadata=self._col_metadata,
row_metadata=self._row_metadata)

def applymap(self, func):
"""Apply a function to a DataFrame elementwise.
Expand All @@ -546,8 +556,8 @@ def applymap(self, func):
for block in self._block_partitions])

return DataFrame(block_partitions=new_block_partitions,
columns=self.columns,
index=self.index)
row_metadata=self._row_metadata,
col_metadata=self._col_metadata)

def copy(self, deep=True):
"""Creates a shallow copy of the DataFrame.
Expand Down Expand Up @@ -659,8 +669,6 @@ def isna(self):
lambda df: df.isna(), block) for block in self._block_partitions])

return DataFrame(block_partitions=new_block_partitions,
columns=self.columns,
index=self.index,
row_metadata=self._row_metadata,
col_metadata=self._col_metadata)

Expand All @@ -678,8 +686,8 @@ def isnull(self):
for block in self._block_partitions])

return DataFrame(block_partitions=new_block_partitions,
columns=self.columns,
index=self.index)
row_metadata=self._row_metadata,
col_metadata=self._col_metadata)

def keys(self):
"""Get the info axis for the DataFrame.
Expand Down Expand Up @@ -1173,13 +1181,13 @@ def _cumulative_helper(self, func, axis):
if axis == 0:
new_cols = _map_partitions(func, self._col_partitions)
return DataFrame(col_partitions=new_cols,
columns=self.columns,
index=self.index)
row_metadata=self._row_metadata,
col_metadata=self._col_metadata)
else:
new_rows = _map_partitions(func, self._row_partitions)
return DataFrame(row_partitions=new_rows,
columns=self.columns,
index=self.index)
row_metadata=self._row_metadata,
col_metadata=self._col_metadata)

def cummax(self, axis=None, skipna=True, *args, **kwargs):
"""Perform a cumulative maximum across the DataFrame.
Expand Down Expand Up @@ -1869,7 +1877,7 @@ def head(self, n=5):
index = self._row_metadata.index[:n]

return DataFrame(col_partitions=new_dfs,
columns=self.columns,
col_metadata=self._col_metadata,
index=index)

def hist(self, data, column=None, by=None, grid=True, xlabelsize=None,
Expand Down Expand Up @@ -2498,8 +2506,8 @@ def notna(self):
lambda df: df.notna(), block) for block in self._block_partitions])

return DataFrame(block_partitions=new_block_partitions,
columns=self.columns,
index=self.index)
row_metadata=self._row_metadata,
col_metadata=self._col_metadata)

def notnull(self):
"""Perform notnull across the DataFrame.
Expand All @@ -2516,8 +2524,8 @@ def notnull(self):
for block in self._block_partitions])

return DataFrame(block_partitions=new_block_partitions,
columns=self.columns,
index=self.index)
row_metadata=self._row_metadata,
col_metadata=self._col_metadata)

def nsmallest(self, n, columns, keep='first'):
raise NotImplementedError(
Expand Down Expand Up @@ -2682,7 +2690,8 @@ def query_helper(df):
if inplace:
self._update_inplace(row_partitions=new_rows)
else:
return DataFrame(row_partitions=new_rows, columns=self.columns)
return DataFrame(row_partitions=new_rows,
col_metadata=self._col_metadata)

def radd(self, other, axis='columns', level=None, fill_value=None):
return self.add(other, axis, level, fill_value)
Expand Down Expand Up @@ -2939,8 +2948,8 @@ def round(self, decimals=0, *args, **kwargs):
for block in self._block_partitions])

return DataFrame(block_partitions=new_block_partitions,
columns=self.columns,
index=self.index)
row_metadata=self._row_metadata,
col_metadata=self._col_metadata)

def rpow(self, other, axis='columns', level=None, fill_value=None):
return self._single_df_op_helper(
Expand Down Expand Up @@ -3372,7 +3381,7 @@ def tail(self, n=5):

index = self._row_metadata.index[-n:]
return DataFrame(col_partitions=new_dfs,
columns=self.columns,
col_metadata=self._col_metadata,
index=index)

def take(self, indices, axis=0, convert=None, is_copy=True, **kwargs):
Expand Down Expand Up @@ -3764,8 +3773,8 @@ def _getitem_slice(self, key):

index = self.index[key]
return DataFrame(col_partitions=new_cols,
index=index,
columns=self.columns)
col_metadata=self._col_metadata,
index=index)

def __getattr__(self, key):
"""After regular attribute access, looks up the name in the columns
Expand Down Expand Up @@ -4073,8 +4082,8 @@ def __neg__(self):
for block in self._block_partitions])

return DataFrame(block_partitions=new_block_partitions,
columns=self.columns,
index=self.index)
col_metadata=self._col_metadata,
row_metadata=self._row_metadata)

def __sizeof__(self):
raise NotImplementedError(
Expand Down
Loading