From f46892ad49937764f514a22522dfad945e2da3fd Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Wed, 12 Sep 2018 10:00:29 -0700 Subject: [PATCH 01/10] Test changes to io --- .../partitioning/partition_collections.py | 3 + modin/pandas/io.py | 65 ++++++++++++------- 2 files changed, 43 insertions(+), 25 deletions(-) diff --git a/modin/data_management/partitioning/partition_collections.py b/modin/data_management/partitioning/partition_collections.py index 8d54d9d43bc..7c7fbbb02b8 100644 --- a/modin/data_management/partitioning/partition_collections.py +++ b/modin/data_management/partitioning/partition_collections.py @@ -672,6 +672,9 @@ def __getitem__(self, key): cls = type(self) return cls(self.partitions[key]) + def __len__(self): + return sum(self.block_lengths) + class RayBlockPartitions(BlockPartitions): """This method implements the interface in `BlockPartitions`.""" diff --git a/modin/pandas/io.py b/modin/pandas/io.py index b7e2309545b..c95471eaa7b 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -17,6 +17,10 @@ from .dataframe import ray, DataFrame from . import get_npartitions from .utils import from_pandas, _partition_pandas_dataframe +from ..data_management.partitioning.partition_collections import RayBlockPartitions +from ..data_management.partitioning.remote_partition import RayRemotePartition +from ..data_management.partitioning.axis_partition import split_result_of_axis_func_pandas +from ..data_management.data_manager import PandasDataManager PQ_INDEX_REGEX = re.compile('__index_level_\d+__') @@ -106,7 +110,7 @@ def _skip_header(f, kwargs={}): return lines_read -def _read_csv_from_file(filepath, npartitions, kwargs={}): +def _read_csv_from_file_pandas_backed_ray(filepath, npartitions, kwargs={}): """Constructs a DataFrame from a CSV file. Args: @@ -119,13 +123,13 @@ def _read_csv_from_file(filepath, npartitions, kwargs={}): """ empty_pd_df = pandas.read_csv( filepath, **dict(kwargs, nrows=0, skipfooter=0, skip_footer=0)) - names = empty_pd_df.columns + column_names = empty_pd_df.columns skipfooter = kwargs["skipfooter"] skip_footer = kwargs["skip_footer"] partition_kwargs = dict( - kwargs, header=None, names=names, skipfooter=0, skip_footer=0) + kwargs, header=None, names=column_names, skipfooter=0, skip_footer=0) with open(filepath, "rb") as f: # Get the BOM if necessary prefix = b"" @@ -142,9 +146,10 @@ def _read_csv_from_file(filepath, npartitions, kwargs={}): # Launch tasks to read partitions partition_ids = [] - index_ids = [] total_bytes = os.path.getsize(filepath) chunk_size = max(1, (total_bytes - f.tell()) // npartitions) + num_splits = RayBlockPartitions._compute_num_partitions() + while f.tell() < total_bytes: start = f.tell() f.seek(chunk_size, os.SEEK_CUR) @@ -154,24 +159,34 @@ def _read_csv_from_file(filepath, npartitions, kwargs={}): kwargs["skipfooter"] = skipfooter kwargs["skip_footer"] = skip_footer - partition_id, index_id = _read_csv_with_offset.remote( - filepath, start, f.tell(), partition_kwargs_id, prefix_id) - partition_ids.append(partition_id) - index_ids.append(index_id) + partition_id = _read_csv_with_offset._submit(args=(filepath, num_splits, start, f.tell(), partition_kwargs_id, prefix_id), num_return_vals=num_splits) + partition_ids.append([RayRemotePartition(obj) for obj in partition_id]) - # Construct index - index_id = get_index.remote([empty_pd_df.index.name], *index_ids) \ - if kwargs["index_col"] is not None else None + new_data = RayBlockPartitions(np.array(partition_ids)) + index_col = kwargs.get("index_col", None) - df = DataFrame(row_partitions=partition_ids, columns=names, index=index_id) + if index_col is not None: + new_index = new_data.get_indices(0, lambda df: df.index) + else: + new_index = pandas.RangeIndex(len(new_data)) - skipfooter = kwargs["skipfooter"] or kwargs["skip_footer"] - if skipfooter: - df = df.drop(df.index[-skipfooter:]) - if kwargs["squeeze"] and len(df.columns) == 1: - return df[df.columns[0]] + new_manager = PandasDataManager(new_data, new_index, column_names) + new_df = DataFrame(data_manager=new_manager) + return new_df - return df + # Construct index + # index_id = get_index.remote([empty_pd_df.index.name], *index_ids) \ + # if kwargs["index_col"] is not None else None + # + # df = DataFrame(row_partitions=partition_ids, columns=names, index=index_id) + # + # skipfooter = kwargs["skipfooter"] or kwargs["skip_footer"] + # if skipfooter: + # df = df.drop(df.index[-skipfooter:]) + # if kwargs["squeeze"] and len(df.columns) == 1: + # return df[df.columns[0]] + # + # return df def _read_csv_from_pandas(filepath_or_buffer, kwargs): @@ -366,7 +381,7 @@ def read_csv(filepath_or_buffer, return _read_csv_from_pandas(filepath_or_buffer, kwargs) - return _read_csv_from_file(filepath_or_buffer, get_npartitions(), kwargs) + return _read_csv_from_file_pandas_backed_ray(filepath_or_buffer, get_npartitions(), kwargs) def read_json(path_or_buf=None, @@ -578,17 +593,17 @@ def get_index(index_name, *partition_indices): return index -@ray.remote(num_return_vals=2) -def _read_csv_with_offset(fn, start, end, kwargs={}, header=b''): - bio = open(fn, 'rb') +@ray.remote +def _read_csv_with_offset(fname, num_splits, start, end, kwargs={}, header=b''): + bio = open(fname, 'rb') bio.seek(start) to_read = header + bio.read(end - start) bio.close() pandas_df = pandas.read_csv(BytesIO(to_read), **kwargs) - index = pandas_df.index + # index = pandas_df.index # Partitions must have RangeIndex - pandas_df.index = pandas.RangeIndex(0, len(pandas_df)) - return pandas_df, index + # pandas_df.index = pandas.RangeIndex(0, len(pandas_df)) + return split_result_of_axis_func_pandas(1, num_splits, pandas_df) @ray.remote From 050effc374dcb3cff4526fff3994294522ddcc61 Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Wed, 12 Sep 2018 12:06:13 -0700 Subject: [PATCH 02/10] Update io changes --- modin/pandas/io.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/modin/pandas/io.py b/modin/pandas/io.py index c95471eaa7b..460bfbdae26 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -20,7 +20,7 @@ from ..data_management.partitioning.partition_collections import RayBlockPartitions from ..data_management.partitioning.remote_partition import RayRemotePartition from ..data_management.partitioning.axis_partition import split_result_of_axis_func_pandas -from ..data_management.data_manager import PandasDataManager +from ..data_management.data_manager import RayPandasDataManager PQ_INDEX_REGEX = re.compile('__index_level_\d+__') @@ -146,6 +146,7 @@ def _read_csv_from_file_pandas_backed_ray(filepath, npartitions, kwargs={}): # Launch tasks to read partitions partition_ids = [] + index_ids = [] total_bytes = os.path.getsize(filepath) chunk_size = max(1, (total_bytes - f.tell()) // npartitions) num_splits = RayBlockPartitions._compute_num_partitions() @@ -159,19 +160,23 @@ def _read_csv_from_file_pandas_backed_ray(filepath, npartitions, kwargs={}): kwargs["skipfooter"] = skipfooter kwargs["skip_footer"] = skip_footer - partition_id = _read_csv_with_offset._submit(args=(filepath, num_splits, start, f.tell(), partition_kwargs_id, prefix_id), num_return_vals=num_splits) - partition_ids.append([RayRemotePartition(obj) for obj in partition_id]) - - new_data = RayBlockPartitions(np.array(partition_ids)) + partition_id = _read_csv_with_offset._submit(args=(filepath, num_splits, start, f.tell(), partition_kwargs_id, prefix_id), num_return_vals=num_splits + 1) + partition_ids.append(partition_id[:-1]) + index_ids.append(partition_id[-1]) index_col = kwargs.get("index_col", None) - - if index_col is not None: - new_index = new_data.get_indices(0, lambda df: df.index) + print("Submission ended") + if index_col is None: + new_index = pandas.RangeIndex(len([i for idx in index_ids for i in ray.get(idx)])) else: - new_index = pandas.RangeIndex(len(new_data)) - new_manager = PandasDataManager(new_data, new_index, column_names) + new_index_ids = ray.get(index_ids) + new_index = new_index_ids[0] + new_index.append(new_index_ids[1:]) + print("Index endd") + new_manager = RayPandasDataManager._from_old_block_partitions(np.array(partition_ids), new_index, column_names) + print("New manager created") new_df = DataFrame(data_manager=new_manager) + print("New frame created") return new_df # Construct index @@ -600,10 +605,10 @@ def _read_csv_with_offset(fname, num_splits, start, end, kwargs={}, header=b''): to_read = header + bio.read(end - start) bio.close() pandas_df = pandas.read_csv(BytesIO(to_read), **kwargs) - # index = pandas_df.index + index = pandas_df.index # Partitions must have RangeIndex - # pandas_df.index = pandas.RangeIndex(0, len(pandas_df)) - return split_result_of_axis_func_pandas(1, num_splits, pandas_df) + pandas_df.index = pandas.RangeIndex(0, len(pandas_df)) + return split_result_of_axis_func_pandas(1, num_splits, pandas_df) + [index] @ray.remote From b7e76869977eff0394c9929cd864585ff96af109 Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Wed, 12 Sep 2018 12:47:45 -0700 Subject: [PATCH 03/10] Fix performance bug --- modin/pandas/io.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/modin/pandas/io.py b/modin/pandas/io.py index 460bfbdae26..c3dad677ae3 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -166,13 +166,12 @@ def _read_csv_from_file_pandas_backed_ray(filepath, npartitions, kwargs={}): index_col = kwargs.get("index_col", None) print("Submission ended") if index_col is None: - new_index = pandas.RangeIndex(len([i for idx in index_ids for i in ray.get(idx)])) + new_index = pandas.RangeIndex(sum(ray.get(index_ids))) else: + new_index_ids = get_index.remote([empty_pd_df.index.name], *index_ids) + new_index = ray.get(new_index_ids) - new_index_ids = ray.get(index_ids) - new_index = new_index_ids[0] - new_index.append(new_index_ids[1:]) - print("Index endd") + print("Index ended") new_manager = RayPandasDataManager._from_old_block_partitions(np.array(partition_ids), new_index, column_names) print("New manager created") new_df = DataFrame(data_manager=new_manager) @@ -605,9 +604,13 @@ def _read_csv_with_offset(fname, num_splits, start, end, kwargs={}, header=b''): to_read = header + bio.read(end - start) bio.close() pandas_df = pandas.read_csv(BytesIO(to_read), **kwargs) - index = pandas_df.index - # Partitions must have RangeIndex - pandas_df.index = pandas.RangeIndex(0, len(pandas_df)) + if kwargs["index_col"] is not None: + index = pandas_df.index + # Partitions must have RangeIndex + pandas_df.index = pandas.RangeIndex(0, len(pandas_df)) + else: + index = len(pandas_df) + return split_result_of_axis_func_pandas(1, num_splits, pandas_df) + [index] From 13ccd3e6cf6440cf0818a44adfc981025a514e10 Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Wed, 12 Sep 2018 13:09:43 -0700 Subject: [PATCH 04/10] Debugging performance --- modin/pandas/io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/pandas/io.py b/modin/pandas/io.py index c3dad677ae3..5cd13bf6dd6 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -149,7 +149,7 @@ def _read_csv_from_file_pandas_backed_ray(filepath, npartitions, kwargs={}): index_ids = [] total_bytes = os.path.getsize(filepath) chunk_size = max(1, (total_bytes - f.tell()) // npartitions) - num_splits = RayBlockPartitions._compute_num_partitions() + num_splits = min(len(column_names), RayBlockPartitions._compute_num_partitions()) while f.tell() < total_bytes: start = f.tell() From 8b451982d3a5b7023514cdb0301ff9bce0df0cf9 Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Wed, 12 Sep 2018 14:28:09 -0700 Subject: [PATCH 05/10] Debugging performance on large IO --- modin/data_management/data_manager.py | 14 ++++++-------- modin/pandas/io.py | 8 ++++---- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index e1c8ce65e03..d473c0fcb7c 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -56,7 +56,7 @@ def _set_dtype(self, dtypes): _columns_cache = None def _get_index(self): - return self._index_cache.index + return self._index_cache def _get_columns(self): return self._columns_cache.index @@ -65,14 +65,12 @@ def _set_index(self, new_index): if self._index_cache is not None: self._index_cache.index = new_index else: - self._index_cache = pandas.Series(index=new_index) - + self._index_cache = new_index# pandas.Series(index=new_index) def _set_columns(self, new_columns): if self._columns_cache is not None: self._columns_cache.index = new_columns else: self._columns_cache = pandas.Series(index=new_columns) - columns = property(_get_columns, _set_columns) index = property(_get_index, _set_index) @@ -1011,20 +1009,20 @@ def head(self, n): # ensure that we extract the correct data on each node. The index # on a transposed manager is already set to the correct value, so # we need to only take the head of that instead of re-transposing. - result = cls(self.data.transpose().take(1, n).transpose(), self.index[:n], self.columns, self.dtypes) + result = cls(self.data.transpose().take(1, n).transpose(), self.index[:n], self.columns, self._dtype_cache) result._is_transposed = True else: - result = cls(self.data.take(0, n), self.index[:n], self.columns, self.dtypes) + result = cls(self.data.take(0, n), self.index[:n], self.columns, self._dtype_cache) return result def tail(self, n): cls = type(self) # See head for an explanation of the transposed behavior if self._is_transposed: - result = cls(self.data.transpose().take(1, -n).transpose(), self.index[-n:], self.columns, self.dtypes) + result = cls(self.data.transpose().take(1, -n).transpose(), self.index[-n:], self.columns, self._dtype_cache) result._is_transposed = True else: - result = cls(self.data.take(0, -n), self.index[-n:], self.columns, self.dtypes) + result = cls(self.data.take(0, -n), self.index[-n:], self.columns, self._dtype_cache) return result def front(self, n): diff --git a/modin/pandas/io.py b/modin/pandas/io.py index 5cd13bf6dd6..94ded77515a 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -20,7 +20,7 @@ from ..data_management.partitioning.partition_collections import RayBlockPartitions from ..data_management.partitioning.remote_partition import RayRemotePartition from ..data_management.partitioning.axis_partition import split_result_of_axis_func_pandas -from ..data_management.data_manager import RayPandasDataManager +from ..data_management.data_manager import PandasDataManager PQ_INDEX_REGEX = re.compile('__index_level_\d+__') @@ -160,8 +160,8 @@ def _read_csv_from_file_pandas_backed_ray(filepath, npartitions, kwargs={}): kwargs["skipfooter"] = skipfooter kwargs["skip_footer"] = skip_footer - partition_id = _read_csv_with_offset._submit(args=(filepath, num_splits, start, f.tell(), partition_kwargs_id, prefix_id), num_return_vals=num_splits + 1) - partition_ids.append(partition_id[:-1]) + partition_id = (_read_csv_with_offset._submit(args=(filepath, num_splits, start, f.tell(), partition_kwargs_id, prefix_id), num_return_vals=num_splits + 1)) + partition_ids.append([RayRemotePartition(obj) for obj in partition_id[:-1]]) index_ids.append(partition_id[-1]) index_col = kwargs.get("index_col", None) print("Submission ended") @@ -172,7 +172,7 @@ def _read_csv_from_file_pandas_backed_ray(filepath, npartitions, kwargs={}): new_index = ray.get(new_index_ids) print("Index ended") - new_manager = RayPandasDataManager._from_old_block_partitions(np.array(partition_ids), new_index, column_names) + new_manager = PandasDataManager(RayBlockPartitions(np.array(partition_ids)), new_index, column_names) print("New manager created") new_df = DataFrame(data_manager=new_manager) print("New frame created") From 8b1a07d4b4ef5e6db6540653a69278a3f37ad5ac Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Thu, 13 Sep 2018 09:43:25 -0700 Subject: [PATCH 06/10] Making some performance tuning changes --- .../partitioning/partition_collections.py | 42 ++++++++++++++++++- .../partitioning/remote_partition.py | 4 +- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/modin/data_management/partitioning/partition_collections.py b/modin/data_management/partitioning/partition_collections.py index 7c7fbbb02b8..ac269d3786f 100644 --- a/modin/data_management/partitioning/partition_collections.py +++ b/modin/data_management/partitioning/partition_collections.py @@ -3,6 +3,7 @@ from __future__ import print_function import numpy as np +import ray import pandas from .remote_partition import RayRemotePartition @@ -99,7 +100,7 @@ def block_lengths(self): # The first column will have the correct lengths. We have an # invariant that requires that all blocks be the same length in a # row of blocks. - self._lengths_cache = [obj.length for obj in self.partitions.T[0]] + self._lengths_cache = [obj.length.get() for obj in self.partitions.T[0]] return self._lengths_cache # Widths of the blocks @@ -116,7 +117,7 @@ def block_widths(self): # The first column will have the correct lengths. We have an # invariant that requires that all blocks be the same width in a # column of blocks. - self._widths_cache = [obj.width for obj in self.partitions[0]] + self._widths_cache = [obj.width.get() for obj in self.partitions[0]] return self._widths_cache def full_reduce(self, map_func, reduce_func, axis): @@ -685,6 +686,43 @@ class RayBlockPartitions(BlockPartitions): def __init__(self, partitions): self.partitions = partitions + # We override these for performance reasons. + # Lengths of the blocks + _lengths_cache = None + + # These are set up as properties so that we only use them when we need + # them. We also do not want to trigger this computation on object creation. + @property + def block_lengths(self): + """Gets the lengths of the blocks. + + Note: This works with the property structure `_lengths_cache` to avoid + having to recompute these values each time they are needed. + """ + if self._lengths_cache is None: + # The first column will have the correct lengths. We have an + # invariant that requires that all blocks be the same length in a + # row of blocks. + self._lengths_cache = ray.get([obj.length.oid for obj in self.partitions.T[0]]) + return self._lengths_cache + + # Widths of the blocks + _widths_cache = None + + @property + def block_widths(self): + """Gets the widths of the blocks. + + Note: This works with the property structure `_widths_cache` to avoid + having to recompute these values each time they are needed. + """ + if self._widths_cache is None: + # The first column will have the correct lengths. We have an + # invariant that requires that all blocks be the same width in a + # column of blocks. + self._widths_cache = ray.get([obj.width.oid for obj in self.partitions[0]]) + return self._widths_cache + @property def column_partitions(self): """A list of `RayColumnPartition` objects.""" diff --git a/modin/data_management/partitioning/remote_partition.py b/modin/data_management/partitioning/remote_partition.py index ecca5041e9f..1a0b3a34ac9 100644 --- a/modin/data_management/partitioning/remote_partition.py +++ b/modin/data_management/partitioning/remote_partition.py @@ -117,7 +117,7 @@ def length(self): func = cls.length_extraction_fn() preprocessed_func = cls.preprocess_func(func) - self._length_cache = self.apply(preprocessed_func).get() + self._length_cache = self.apply(preprocessed_func) return self._length_cache @property @@ -127,7 +127,7 @@ def width(self): func = cls.width_extraction_fn() preprocessed_func = cls.preprocess_func(func) - self._width_cache = self.apply(preprocessed_func).get() + self._width_cache = self.apply(preprocessed_func) return self._width_cache From b013cae48bcfd949713be495665f208536d1609d Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Thu, 13 Sep 2018 11:27:55 -0700 Subject: [PATCH 07/10] Cleaning up and adding performance improvements --- modin/pandas/io.py | 125 ++++++++++++--------------------------------- 1 file changed, 32 insertions(+), 93 deletions(-) diff --git a/modin/pandas/io.py b/modin/pandas/io.py index 94ded77515a..4079ac7b227 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -5,6 +5,7 @@ import pandas from pandas.io.common import _infer_compression +import inspect from io import BytesIO import os import py @@ -63,11 +64,11 @@ def read_parquet(path, engine='auto', columns=None, **kwargs): # CSV def _skip_header(f, kwargs={}): lines_read = 0 - comment = kwargs["comment"] - skiprows = kwargs["skiprows"] - encoding = kwargs["encoding"] - header = kwargs["header"] - names = kwargs["names"] + comment = kwargs.get("comment", None) + skiprows = kwargs.get("skiprows", None) + encoding = kwargs.get("encoding", None) + header = kwargs.get("header", "infer") + names = kwargs.get("names", None) if header is None: return lines_read @@ -110,7 +111,7 @@ def _skip_header(f, kwargs={}): return lines_read -def _read_csv_from_file_pandas_backed_ray(filepath, npartitions, kwargs={}): +def _read_csv_from_file_pandas_on_ray(filepath, npartitions, kwargs={}): """Constructs a DataFrame from a CSV file. Args: @@ -125,15 +126,14 @@ def _read_csv_from_file_pandas_backed_ray(filepath, npartitions, kwargs={}): filepath, **dict(kwargs, nrows=0, skipfooter=0, skip_footer=0)) column_names = empty_pd_df.columns - skipfooter = kwargs["skipfooter"] - skip_footer = kwargs["skip_footer"] + skipfooter = kwargs.get("skipfooter", None) partition_kwargs = dict( kwargs, header=None, names=column_names, skipfooter=0, skip_footer=0) with open(filepath, "rb") as f: # Get the BOM if necessary prefix = b"" - if kwargs["encoding"] is not None: + if kwargs.get("encoding", None) is not None: prefix = f.readline() partition_kwargs["skiprows"] = 1 f.seek(0, os.SEEK_SET) # Return to beginning of file @@ -156,41 +156,26 @@ def _read_csv_from_file_pandas_backed_ray(filepath, npartitions, kwargs={}): f.seek(chunk_size, os.SEEK_CUR) f.readline() # Read a whole number of lines - if f.tell() >= total_bytes: - kwargs["skipfooter"] = skipfooter - kwargs["skip_footer"] = skip_footer - - partition_id = (_read_csv_with_offset._submit(args=(filepath, num_splits, start, f.tell(), partition_kwargs_id, prefix_id), num_return_vals=num_splits + 1)) + partition_id = (_read_csv_with_offset_pandas_on_ray._submit(args=(filepath, num_splits, start, f.tell(), partition_kwargs_id, prefix_id), num_return_vals=num_splits + 1)) partition_ids.append([RayRemotePartition(obj) for obj in partition_id[:-1]]) index_ids.append(partition_id[-1]) + index_col = kwargs.get("index_col", None) - print("Submission ended") if index_col is None: new_index = pandas.RangeIndex(sum(ray.get(index_ids))) else: new_index_ids = get_index.remote([empty_pd_df.index.name], *index_ids) new_index = ray.get(new_index_ids) - print("Index ended") new_manager = PandasDataManager(RayBlockPartitions(np.array(partition_ids)), new_index, column_names) - print("New manager created") - new_df = DataFrame(data_manager=new_manager) - print("New frame created") - return new_df - - # Construct index - # index_id = get_index.remote([empty_pd_df.index.name], *index_ids) \ - # if kwargs["index_col"] is not None else None - # - # df = DataFrame(row_partitions=partition_ids, columns=names, index=index_id) - # - # skipfooter = kwargs["skipfooter"] or kwargs["skip_footer"] - # if skipfooter: - # df = df.drop(df.index[-skipfooter:]) - # if kwargs["squeeze"] and len(df.columns) == 1: - # return df[df.columns[0]] - # - # return df + df = DataFrame(data_manager=new_manager) + + if skipfooter: + df = df.drop(df.index[-skipfooter:]) + if kwargs.get("squeeze", False) and len(df.columns) == 1: + return df[df.columns[0]] + + return df def _read_csv_from_pandas(filepath_or_buffer, kwargs): @@ -269,62 +254,16 @@ def read_csv(filepath_or_buffer, We only support local files for now. kwargs: Keyword arguments in pandas::from_csv """ - - kwargs = { - 'sep': sep, - 'delimiter': delimiter, - 'header': header, - 'names': names, - 'index_col': index_col, - 'usecols': usecols, - 'squeeze': squeeze, - 'prefix': prefix, - 'mangle_dupe_cols': mangle_dupe_cols, - 'dtype': dtype, - 'engine': engine, - 'converters': converters, - 'true_values': true_values, - 'false_values': false_values, - 'skipinitialspace': skipinitialspace, - 'skiprows': skiprows, - 'nrows': nrows, - 'na_values': na_values, - 'keep_default_na': keep_default_na, - 'na_filter': na_filter, - 'verbose': verbose, - 'skip_blank_lines': skip_blank_lines, - 'parse_dates': parse_dates, - 'infer_datetime_format': infer_datetime_format, - 'keep_date_col': keep_date_col, - 'date_parser': date_parser, - 'dayfirst': dayfirst, - 'iterator': iterator, - 'chunksize': chunksize, - 'compression': compression, - 'thousands': thousands, - 'decimal': decimal, - 'lineterminator': lineterminator, - 'quotechar': quotechar, - 'quoting': quoting, - 'escapechar': escapechar, - 'comment': comment, - 'encoding': encoding, - 'dialect': dialect, - 'tupleize_cols': tupleize_cols, - 'error_bad_lines': error_bad_lines, - 'warn_bad_lines': warn_bad_lines, - 'skipfooter': skipfooter, - 'skip_footer': skip_footer, - 'doublequote': doublequote, - 'delim_whitespace': delim_whitespace, - 'as_recarray': as_recarray, - 'compact_ints': compact_ints, - 'use_unsigned': use_unsigned, - 'low_memory': low_memory, - 'buffer_lines': buffer_lines, - 'memory_map': memory_map, - 'float_precision': float_precision, - } + # The intention of the inspection code is to reduce the amount of + # communication we have to do between processes and nodes. We take a quick + # pass over the arguments and remove those that are default values so we + # don't have to serialize and send them to the workers. Because the + # arguments list is so long, this does end up saving time based on the + # number of nodes in the cluster. + frame = inspect.currentframe() + _, _, _, kwargs = inspect.getargvalues(frame) + _, _, _, defaults, _, _, _ = inspect.getfullargspec(read_csv) + kwargs = {kw: kwargs[kw] for kw in kwargs if kw in defaults and kwargs[kw] != defaults[kw]} if isinstance(filepath_or_buffer, str): if not os.path.exists(filepath_or_buffer): @@ -385,7 +324,7 @@ def read_csv(filepath_or_buffer, return _read_csv_from_pandas(filepath_or_buffer, kwargs) - return _read_csv_from_file_pandas_backed_ray(filepath_or_buffer, get_npartitions(), kwargs) + return _read_csv_from_file_pandas_on_ray(filepath_or_buffer, get_npartitions(), kwargs) def read_json(path_or_buf=None, @@ -598,13 +537,13 @@ def get_index(index_name, *partition_indices): @ray.remote -def _read_csv_with_offset(fname, num_splits, start, end, kwargs={}, header=b''): +def _read_csv_with_offset_pandas_on_ray(fname, num_splits, start, end, kwargs={}, header=b''): bio = open(fname, 'rb') bio.seek(start) to_read = header + bio.read(end - start) bio.close() pandas_df = pandas.read_csv(BytesIO(to_read), **kwargs) - if kwargs["index_col"] is not None: + if kwargs.get("index_col", None) is not None: index = pandas_df.index # Partitions must have RangeIndex pandas_df.index = pandas.RangeIndex(0, len(pandas_df)) From 813c1d249d682210f3b0726b1eb5c695034b2145 Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Thu, 13 Sep 2018 12:14:01 -0700 Subject: [PATCH 08/10] Cleaning up --- modin/data_management/data_manager.py | 37 ++++++++++++++++----------- modin/pandas/io.py | 6 +++-- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index d473c0fcb7c..beda45d802e 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -8,6 +8,7 @@ from pandas.compat import string_types from pandas.core.dtypes.cast import find_common_type from pandas.core.dtypes.common import (_get_dtype_from_object, is_list_like) +from pandas.core.index import _ensure_index from .partitioning.partition_collections import BlockPartitions, RayBlockPartitions from .partitioning.remote_partition import RayRemotePartition @@ -26,7 +27,7 @@ def __init__(self, block_partitions_object, index, columns, dtypes=None): if dtypes is not None: self._dtype_cache = dtypes - # dtypes + # Index, columns and dtypes objects _dtype_cache = None def _get_dtype(self): @@ -45,35 +46,41 @@ def _set_dtype(self, dtypes): dtypes = property(_get_dtype, _set_dtype) - # Index and columns objects # These objects are currently not distributed. - # Note: These are more performant as pandas.Series objects than they are as - # pandas.DataFrame objects. - # - # _index_cache is a pandas.Series that holds the index _index_cache = None - # _columns_cache is a pandas.Series that holds the columns _columns_cache = None def _get_index(self): return self._index_cache def _get_columns(self): - return self._columns_cache.index + return self._columns_cache + + def _validate_set_axis(self, new_labels, old_labels): + new_labels = _ensure_index(new_labels) + old_len = len(old_labels) + new_len = len(new_labels) + if old_len != new_len: + raise ValueError('Length mismatch: Expected axis has %d elements, ' + 'new values have %d elements' % (old_len, new_len)) + return new_labels def _set_index(self, new_index): - if self._index_cache is not None: - self._index_cache.index = new_index + if self._index_cache is None: + self._index_cache = new_index else: - self._index_cache = new_index# pandas.Series(index=new_index) + new_index = self._validate_set_axis(new_index, self._index_cache) + self._index_cache = new_index + def _set_columns(self, new_columns): - if self._columns_cache is not None: - self._columns_cache.index = new_columns + if self._columns_cache is None: + self._columns_cache = new_columns else: - self._columns_cache = pandas.Series(index=new_columns) + new_columns = self._validate_set_axis(new_columns, self._index_cache) + self._columns_cache = new_columns + columns = property(_get_columns, _set_columns) index = property(_get_index, _set_index) - # END Index, columns, and dtypes objects def compute_index(self, axis, data_object, compute_diff=True): diff --git a/modin/pandas/io.py b/modin/pandas/io.py index 4079ac7b227..d42e7d2b0f4 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -156,7 +156,7 @@ def _read_csv_from_file_pandas_on_ray(filepath, npartitions, kwargs={}): f.seek(chunk_size, os.SEEK_CUR) f.readline() # Read a whole number of lines - partition_id = (_read_csv_with_offset_pandas_on_ray._submit(args=(filepath, num_splits, start, f.tell(), partition_kwargs_id, prefix_id), num_return_vals=num_splits + 1)) + partition_id = _read_csv_with_offset_pandas_on_ray._submit(args=(filepath, num_splits, start, f.tell(), partition_kwargs_id, prefix_id), num_return_vals=num_splits + 1) partition_ids.append([RayRemotePartition(obj) for obj in partition_id[:-1]]) index_ids.append(partition_id[-1]) @@ -537,7 +537,7 @@ def get_index(index_name, *partition_indices): @ray.remote -def _read_csv_with_offset_pandas_on_ray(fname, num_splits, start, end, kwargs={}, header=b''): +def _read_csv_with_offset_pandas_on_ray(fname, num_splits, start, end, kwargs, header): bio = open(fname, 'rb') bio.seek(start) to_read = header + bio.read(end - start) @@ -548,6 +548,8 @@ def _read_csv_with_offset_pandas_on_ray(fname, num_splits, start, end, kwargs={} # Partitions must have RangeIndex pandas_df.index = pandas.RangeIndex(0, len(pandas_df)) else: + # We will use the lengths to build the index if we are not given an + # `index_col`. index = len(pandas_df) return split_result_of_axis_func_pandas(1, num_splits, pandas_df) + [index] From 942d72b2be9786b7bf9bfb46cee952fddc6dbfbb Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Thu, 13 Sep 2018 12:58:51 -0700 Subject: [PATCH 09/10] Addressing comments --- modin/pandas/io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/pandas/io.py b/modin/pandas/io.py index d42e7d2b0f4..dfab7a730be 100644 --- a/modin/pandas/io.py +++ b/modin/pandas/io.py @@ -126,7 +126,7 @@ def _read_csv_from_file_pandas_on_ray(filepath, npartitions, kwargs={}): filepath, **dict(kwargs, nrows=0, skipfooter=0, skip_footer=0)) column_names = empty_pd_df.columns - skipfooter = kwargs.get("skipfooter", None) + skipfooter = kwargs.get("skipfooter", None) or kwargs.get("skip_footer", None) partition_kwargs = dict( kwargs, header=None, names=column_names, skipfooter=0, skip_footer=0) From 0cbd23059d90b9f4ea7d03f1a848fc4f92505974 Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Thu, 13 Sep 2018 14:56:29 -0700 Subject: [PATCH 10/10] Addressing comments --- modin/data_management/data_manager.py | 6 +++--- .../data_management/partitioning/partition_collections.py | 8 ++++---- modin/data_management/partitioning/remote_partition.py | 2 -- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index beda45d802e..6581f811335 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -67,16 +67,16 @@ def _validate_set_axis(self, new_labels, old_labels): def _set_index(self, new_index): if self._index_cache is None: - self._index_cache = new_index + self._index_cache = _ensure_index(new_index) else: new_index = self._validate_set_axis(new_index, self._index_cache) self._index_cache = new_index def _set_columns(self, new_columns): if self._columns_cache is None: - self._columns_cache = new_columns + self._columns_cache = _ensure_index(new_columns) else: - new_columns = self._validate_set_axis(new_columns, self._index_cache) + new_columns = self._validate_set_axis(new_columns, self._columns_cache) self._columns_cache = new_columns columns = property(_get_columns, _set_columns) diff --git a/modin/data_management/partitioning/partition_collections.py b/modin/data_management/partitioning/partition_collections.py index ac269d3786f..5b5cb0aacec 100644 --- a/modin/data_management/partitioning/partition_collections.py +++ b/modin/data_management/partitioning/partition_collections.py @@ -100,7 +100,7 @@ def block_lengths(self): # The first column will have the correct lengths. We have an # invariant that requires that all blocks be the same length in a # row of blocks. - self._lengths_cache = [obj.length.get() for obj in self.partitions.T[0]] + self._lengths_cache = [obj.length().get() for obj in self.partitions.T[0]] return self._lengths_cache # Widths of the blocks @@ -117,7 +117,7 @@ def block_widths(self): # The first column will have the correct lengths. We have an # invariant that requires that all blocks be the same width in a # column of blocks. - self._widths_cache = [obj.width.get() for obj in self.partitions[0]] + self._widths_cache = [obj.width().get() for obj in self.partitions[0]] return self._widths_cache def full_reduce(self, map_func, reduce_func, axis): @@ -703,7 +703,7 @@ def block_lengths(self): # The first column will have the correct lengths. We have an # invariant that requires that all blocks be the same length in a # row of blocks. - self._lengths_cache = ray.get([obj.length.oid for obj in self.partitions.T[0]]) + self._lengths_cache = ray.get([obj.length().oid for obj in self.partitions.T[0]]) return self._lengths_cache # Widths of the blocks @@ -720,7 +720,7 @@ def block_widths(self): # The first column will have the correct lengths. We have an # invariant that requires that all blocks be the same width in a # column of blocks. - self._widths_cache = ray.get([obj.width.oid for obj in self.partitions[0]]) + self._widths_cache = ray.get([obj.width().oid for obj in self.partitions[0]]) return self._widths_cache @property diff --git a/modin/data_management/partitioning/remote_partition.py b/modin/data_management/partitioning/remote_partition.py index 1a0b3a34ac9..0923f2bab73 100644 --- a/modin/data_management/partitioning/remote_partition.py +++ b/modin/data_management/partitioning/remote_partition.py @@ -110,7 +110,6 @@ def width_extraction_fn(cls): _length_cache = None _width_cache = None - @property def length(self): if self._length_cache is None: cls = type(self) @@ -120,7 +119,6 @@ def length(self): self._length_cache = self.apply(preprocessed_func) return self._length_cache - @property def width(self): if self._width_cache is None: cls = type(self)