Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def __repr__(self):
return repr(self._repr_helper_())
# The split here is so that we don't repr pandas row lengths.
result = self._repr_helper_()
final_result = repr(result).rsplit("\n\n", maxsplit=1)[0] + \
final_result = repr(result).rsplit("\n\n", 1)[0] + \
"\n\n[{0} rows x {1} columns]".format(len(self.index),
len(self.columns))
return final_result
Expand Down
274 changes: 190 additions & 84 deletions modin/pandas/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from io import BytesIO
import os
import py
from pyarrow.parquet import ParquetFile
import pyarrow.parquet as pq
import re
Expand Down Expand Up @@ -63,51 +64,127 @@ def _read_parquet_column(path, column, kwargs={}):


# CSV
def _compute_offset(fn, npartitions, ignore_first_line=False):
"""
Calculate the currect bytes offsets for a csv file.
Return a list of (start, end) tuple where the end == \n or EOF.
"""
total_bytes = os.path.getsize(fn)
bio = open(fn, 'rb')
if ignore_first_line:
start = len(bio.readline())
chunksize = (total_bytes - start) // npartitions
else:
start = 0
chunksize = total_bytes // npartitions
if chunksize == 0:
chunksize = 1

offsets = []
while start < total_bytes:
bio.seek(chunksize, 1) # Move forward {chunksize} bytes
extend_line = bio.readline() # Move after the next \n
total_offset = chunksize + len(extend_line)
# The position of the \n we just crossed.
new_line_cursor = start + total_offset - 1
offsets.append((start, new_line_cursor))
start = new_line_cursor + 1

bio.close()
return offsets


def _get_firstline(file_path):
bio = open(file_path, 'rb')
first = bio.readline()
bio.close()
return first
def _skip_header(f, kwargs={}):
lines_read = 0
comment = kwargs["comment"]
skiprows = kwargs["skiprows"]
encoding = kwargs["encoding"]
header = kwargs["header"]
names = kwargs["names"]

if header is None:
return lines_read

if header == "infer":
if names is not None:
return lines_read
else:
header = 0

# Skip lines before the header
if isinstance(skiprows, int):
lines_read += skiprows
for _ in range(skiprows):
f.readline()
skiprows = None

header_lines = header + 1 if isinstance(header, int) else max(header) + 1

header_lines_skipped = 0
# Python 2 files use a read-ahead buffer which breaks our use of tell()
for line in iter(f.readline, ""):
lines_read += 1
skip = False
if not skip and comment is not None:
if encoding is not None:
skip |= line.decode(encoding)[0] == comment
else:
skip |= line.decode()[0] == comment
if not skip and callable(skiprows):
skip |= skiprows(lines_read)
elif not skip and hasattr(skiprows, "__contains__"):
skip |= lines_read in skiprows

if not skip:
header_lines_skipped += 1
if header_lines_skipped == header_lines:
return lines_read

return lines_read


def _read_csv_from_file(filepath, npartitions, kwargs={}):
"""Constructs a DataFrame from a CSV file.

Args:
filepath (str): path to the CSV file.
npartitions (int): number of partitions for the DataFrame.
kwargs (dict): args excluding filepath provided to read_csv.

def _infer_column(first_line, kwargs={}):
return pandas.read_csv(BytesIO(first_line), **kwargs).columns
Returns:
DataFrame or Series constructed from CSV file.
"""
empty_pd_df = pandas.read_csv(
filepath, **dict(kwargs, nrows=0, skipfooter=0, skip_footer=0))
names = empty_pd_df.columns

skipfooter = kwargs["skipfooter"]
skip_footer = kwargs["skip_footer"]

partition_kwargs = dict(
kwargs, header=None, names=names, skipfooter=0, skip_footer=0)
with open(filepath, "rb") as f:
# Get the BOM if necessary
prefix = b""
if kwargs["encoding"] is not None:
prefix = f.readline()
partition_kwargs["skiprows"] = 1
f.seek(0, os.SEEK_SET) # Return to beginning of file

prefix_id = ray.put(prefix)
partition_kwargs_id = ray.put(partition_kwargs)

# Skip the header since we already have the header information
_skip_header(f, kwargs)

# Launch tasks to read partitions
partition_ids = []
index_ids = []
total_bytes = os.path.getsize(filepath)
chunk_size = max(1, (total_bytes - f.tell()) // npartitions)
while f.tell() < total_bytes:
start = f.tell()
f.seek(chunk_size, os.SEEK_CUR)
f.readline() # Read a whole number of lines

if f.tell() >= total_bytes:
kwargs["skipfooter"] = skipfooter
kwargs["skip_footer"] = skip_footer

partition_id, index_id = _read_csv_with_offset._submit(
args=(filepath, start, f.tell(), partition_kwargs_id,
prefix_id),
num_return_vals=2)
partition_ids.append(partition_id)
index_ids.append(index_id)

# Construct index
index_id = get_index.remote([empty_pd_df.index.name], *index_ids) \
if kwargs["index_col"] is not None else None

df = DataFrame(row_partitions=partition_ids, columns=names, index=index_id)

skipfooter = kwargs["skipfooter"] or kwargs["skip_footer"]
if skipfooter:
df = df.drop(df.index[-skipfooter:])
if kwargs["squeeze"] and len(df.columns) == 1:
return df[df.columns[0]]

return df


@ray.remote
def _read_csv_with_offset(fn, start, end, kwargs={}, header=b''):
kwargs["quoting"] = int(kwargs["quoting"]) # See issue #2078

bio = open(fn, 'rb')
bio.seek(start)
to_read = header + bio.read(end - start)
Expand All @@ -119,9 +196,26 @@ def _read_csv_with_offset(fn, start, end, kwargs={}, header=b''):
return pandas_df, index


def _read_csv_from_pandas(filepath_or_buffer, kwargs):
pd_obj = pandas.read_csv(filepath_or_buffer, **kwargs)

if isinstance(pd_obj, pandas.DataFrame):
return from_pandas(pd_obj, get_npartitions())
elif isinstance(pd_obj, pandas.io.parsers.TextFileReader):
# Overwriting the read method should return a ray DataFrame for calls
# to __next__ and get_chunk
pd_read = pd_obj.read
pd_obj.read = lambda *args, **kwargs: \
from_pandas(pd_read(*args, **kwargs), get_npartitions())

return pd_obj


@ray.remote
def get_index(*partition_indices):
return partition_indices[0].append(partition_indices[1:])
def get_index(index_name, *partition_indices):
index = partition_indices[0].append(partition_indices[1:])
index.names = index_name
return index


def read_csv(filepath_or_buffer,
Expand Down Expand Up @@ -179,7 +273,6 @@ def read_csv(filepath_or_buffer,
memory_map=False,
float_precision=None):
"""Read csv file from local disk.

Args:
filepath:
The filepath of the csv file.
Expand Down Expand Up @@ -243,53 +336,66 @@ def read_csv(filepath_or_buffer,
'float_precision': float_precision,
}

# Default to Pandas read_csv for non-serializable objects
if not isinstance(filepath_or_buffer, str) or \
_infer_compression(filepath_or_buffer, compression) is not None:
if isinstance(filepath_or_buffer, str):
if not os.path.exists(filepath_or_buffer):
warnings.warn(("File not found on disk. "
"Defaulting to Pandas implementation."),
PendingDeprecationWarning)

return _read_csv_from_pandas(filepath_or_buffer, kwargs)
elif not isinstance(filepath_or_buffer, py.path.local):
read_from_pandas = True

# Pandas read_csv supports pathlib.Path
try:
import pathlib
if isinstance(filepath_or_buffer, pathlib.Path):
read_from_pandas = False
except ImportError:
pass

if read_from_pandas:
warnings.warn(("Reading from buffer. "
"Defaulting to Pandas implementation."),
PendingDeprecationWarning)

return _read_csv_from_pandas(filepath_or_buffer, kwargs)

if _infer_compression(filepath_or_buffer, compression) is not None:
warnings.warn(("Compression detected. "
"Defaulting to Pandas implementation."),
PendingDeprecationWarning)

warnings.warn("Defaulting to Pandas implementation",
return _read_csv_from_pandas(filepath_or_buffer, kwargs)

if as_recarray:
warnings.warn("Defaulting to Pandas implementation.",
PendingDeprecationWarning)

pandas_obj = pandas.read_csv(filepath_or_buffer, **kwargs)
if isinstance(pandas_obj, pandas.DataFrame):
return from_pandas(pandas_obj, get_npartitions())

return pandas_obj

filepath = filepath_or_buffer

# TODO: handle case where header is a list of lines
first_line = _get_firstline(filepath)
columns = _infer_column(first_line, kwargs=kwargs)
if header is None or (header == "infer" and names is not None):
first_line = b""
ignore_first_line = False
else:
ignore_first_line = True

offsets = _compute_offset(
filepath, get_npartitions(), ignore_first_line=ignore_first_line)

# Serialize objects to speed up later use in remote tasks
first_line_id = ray.put(first_line)
kwargs_id = ray.put(kwargs)

df_obj_ids = []
index_obj_ids = []
for start, end in offsets:
if start != 0:
df, index = _read_csv_with_offset._submit(
args=(filepath, start, end, kwargs_id, first_line_id),
num_return_vals=2)
else:
df, index = _read_csv_with_offset._submit(
args=(filepath, start, end, kwargs_id), num_return_vals=2)
df_obj_ids.append(df)
index_obj_ids.append(index)
return _read_csv_from_pandas(filepath_or_buffer, kwargs)

if chunksize is not None:
warnings.warn(("Reading chunks from a file. "
"Defaulting to Pandas implementation."),
PendingDeprecationWarning)

return _read_csv_from_pandas(filepath_or_buffer, kwargs)

if skiprows is not None and not isinstance(skiprows, int):
warnings.warn(("Defaulting to Pandas implementation. To speed up "
"read_csv through the Pandas on Ray implementation, "
"comment the rows to skip instead."))

return _read_csv_from_pandas(filepath_or_buffer, kwargs)

# TODO: replace this by reading lines from file.
if nrows is not None:
warnings.warn("Defaulting to Pandas implementation.",
PendingDeprecationWarning)

index = get_index.remote(*index_obj_ids) if index_col is not None else None
return _read_csv_from_pandas(filepath_or_buffer, kwargs)

return DataFrame(row_partitions=df_obj_ids, columns=columns, index=index)
return _read_csv_from_file(filepath_or_buffer, get_npartitions(), kwargs)


def read_json(path_or_buf=None,
Expand Down
30 changes: 30 additions & 0 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,36 @@ def test_from_csv():
teardown_csv_file()


def test_from_csv_chunksize():
setup_csv_file(SMALL_ROW_SIZE)

# Tests __next__ and correctness of reader as an iterator
# Use larger chunksize to read through file quicker
rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=500)
pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=500)

for ray_df, pd_df in zip(rdf_reader, pd_reader):
assert ray_df_equals_pandas(ray_df, pd_df)

# Tests that get_chunk works correctly
rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=1)
pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=1)

ray_df = rdf_reader.get_chunk(1)
pd_df = pd_reader.get_chunk(1)

assert ray_df_equals_pandas(ray_df, pd_df)

# Tests that read works correctly
rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=1)
pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=1)

ray_df = rdf_reader.read()
pd_df = pd_reader.read()

assert ray_df_equals_pandas(ray_df, pd_df)


def test_from_json():
setup_json_file(SMALL_ROW_SIZE)

Expand Down