Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
338 changes: 197 additions & 141 deletions python/ray/dataframe/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from itertools import chain
from io import BytesIO
import os
import pathlib
import py
import re
import warnings

Expand Down Expand Up @@ -84,45 +86,104 @@ def _split_df(pd_df, chunksize):


# CSV
def _compute_offset(fn, npartitions, ignore_first_line=False):
"""
Calculate the currect bytes offsets for a csv file.
Return a list of (start, end) tuple where the end == \n or EOF.
"""
total_bytes = os.path.getsize(fn)
bio = open(fn, 'rb')
if ignore_first_line:
start = len(bio.readline())
chunksize = (total_bytes - start) // npartitions
else:
start = 0
chunksize = total_bytes // npartitions
if chunksize == 0:
chunksize = 1

offsets = []
while start < total_bytes:
bio.seek(chunksize, 1) # Move forward {chunksize} bytes
extend_line = bio.readline() # Move after the next \n
total_offset = chunksize + len(extend_line)
# The position of the \n we just crossed.
new_line_cursor = start + total_offset - 1
offsets.append((start, new_line_cursor))
start = new_line_cursor + 1

bio.close()
return offsets


def _get_firstline(file_path):
bio = open(file_path, 'rb')
first = bio.readline()
bio.close()
return first
def _skip_header(fp, kwargs={}):
lines_read = 0
comment = kwargs["comment"]
skiprows = kwargs["skiprows"]
encoding = kwargs["encoding"]
header = kwargs["header"]
names = kwargs["names"]

if header is None:
return lines_read

if header == "infer":
if names is not None:
return lines_read
else:
header = 0

# Skip lines before the header
if isinstance(skiprows, int):
lines_read += skiprows
for _ in range(skiprows):
fp.readline()
skiprows = None

header_lines = header + 1 if isinstance(header, int) else max(header) + 1

header_lines_skipped = 0
for line in fp:
lines_read += 1
skip = False
if not skip and comment is not None:
if encoding is not None:
skip |= line.decode(encoding)[0] == comment
else:
skip |= line.decode()[0] == comment
if not skip and callable(skiprows):
skip |= skiprows(lines_read)
elif not skip and hasattr(skiprows, "__contains__"):
skip |= lines_read in skiprows

if not skip:
header_lines_skipped += 1
if header_lines_skipped == header_lines:
return lines_read

return lines_read


def _read_csv_from_file(filepath, npartitions, kwargs={}):
"""Constructs a FataFrame from a CSV file.

Args:
filepath (str): path to the CSV file.
npartitions (int): number of partitions for the DataFrame.
kwargs (dict): args excluding filepath provided to read_csv.

def _infer_column(first_line, kwargs={}):
return pd.read_csv(BytesIO(first_line), **kwargs).columns
Returns:
DataFrame constructed from CSV file.
"""
empty_pd_df = pd.read_csv(filepath, **dict(kwargs, nrows=0))
names = empty_pd_df.columns
partition_kwargs = dict(kwargs, header=None, names=names)
with open(filepath, "rb") as f:
# Get the BOM if necessary
prefix = b""
if kwargs["encoding"] is not None:
prefix = f.readline()
partition_kwargs["skiprows"] = 1
f.seek(0, os.SEEK_SET) # Return to beginning of file

prefix_id = ray.put(prefix)
partition_kwargs_id = ray.put(partition_kwargs)

# Skip the header since we already have the header information
_skip_header(f, kwargs)

# Launch tasks to read partitions
partition_ids = []
index_ids = []
total_bytes = os.path.getsize(filepath)
chunk_size = max(1, (total_bytes - f.tell()) // npartitions)
while f.tell() < total_bytes:
start = f.tell()
f.seek(chunk_size, os.SEEK_CUR)
f.readline() # Read a whole number of lines
partition_id, index_id = _read_csv_with_offset._submit(
args=(filepath, start, f.tell(), partition_kwargs_id,
prefix_id),
num_return_vals=2)
partition_ids.append(partition_id)
index_ids.append(index_id)

# Construct index
index_id = get_index.remote([empty_pd_df.index.name], *index_ids) \
if kwargs["index_col"] is not None else None

return DataFrame(row_partitions=partition_ids, columns=names,
index=index_id)


@ray.remote
Expand All @@ -140,9 +201,26 @@ def _read_csv_with_offset(fn, start, end, kwargs={}, header=b''):
return pd_df, index


def _read_csv_from_pandas(filepath_or_buffer, kwargs):
pd_obj = pd.read_csv(filepath_or_buffer, **kwargs)

if isinstance(pd_obj, pd.DataFrame):
return from_pandas(pd_obj, get_npartitions())
elif isinstance(pd_obj, pd.io.parsers.TextFileReader):
# Overwriting the read method should return a ray DataFrame for calls
# to __next__ and get_chunk
pd_read = pd_obj.read
pd_obj.read = lambda *args, **kwargs: \
from_pandas(pd_read(*args, **kwargs), get_npartitions())

return pd_obj


@ray.remote
def get_index(*partition_indices):
return partition_indices[0].append(partition_indices[1:])
def get_index(index_name, *partition_indices):
index = partition_indices[0].append(partition_indices[1:])
index.names = index_name
return index


def read_csv(filepath_or_buffer,
Expand Down Expand Up @@ -189,14 +267,9 @@ def read_csv(filepath_or_buffer,
error_bad_lines=True,
warn_bad_lines=True,
skipfooter=0,
skip_footer=0,
doublequote=True,
delim_whitespace=False,
as_recarray=None,
compact_ints=None,
use_unsigned=None,
low_memory=True,
buffer_lines=None,
memory_map=False,
float_precision=None):
"""Read csv file from local disk.
Expand All @@ -208,110 +281,93 @@ def read_csv(filepath_or_buffer,
kwargs: Keyword arguments in pandas::from_csv
"""

kwargs = {
'sep': sep,
'delimiter': delimiter,
'header': header,
'names': names,
'index_col': index_col,
'usecols': usecols,
'squeeze': squeeze,
'prefix': prefix,
'mangle_dupe_cols': mangle_dupe_cols,
'dtype': dtype,
'engine': engine,
'converters': converters,
'true_values': true_values,
'false_values': false_values,
'skipinitialspace': skipinitialspace,
'skiprows': skiprows,
'nrows': nrows,
'na_values': na_values,
'keep_default_na': keep_default_na,
'na_filter': na_filter,
'verbose': verbose,
'skip_blank_lines': skip_blank_lines,
'parse_dates': parse_dates,
'infer_datetime_format': infer_datetime_format,
'keep_date_col': keep_date_col,
'date_parser': date_parser,
'dayfirst': dayfirst,
'iterator': iterator,
'chunksize': chunksize,
'compression': compression,
'thousands': thousands,
'decimal': decimal,
'lineterminator': lineterminator,
'quotechar': quotechar,
'quoting': quoting,
'escapechar': escapechar,
'comment': comment,
'encoding': encoding,
'dialect': dialect,
'tupleize_cols': tupleize_cols,
'error_bad_lines': error_bad_lines,
'warn_bad_lines': warn_bad_lines,
'skipfooter': skipfooter,
'skip_footer': skip_footer,
'doublequote': doublequote,
'delim_whitespace': delim_whitespace,
'as_recarray': as_recarray,
'compact_ints': compact_ints,
'use_unsigned': use_unsigned,
'low_memory': low_memory,
'buffer_lines': buffer_lines,
'memory_map': memory_map,
'float_precision': float_precision,
}

# Default to Pandas read_csv for non-serializable objects
if not isinstance(filepath_or_buffer, str) or \
_infer_compression(filepath_or_buffer, compression) is not None:

warnings.warn("Defaulting to Pandas implementation",
kwargs = dict(
sep=sep,
delimiter=delimiter,
header=header,
names=names,
index_col=index_col,
usecols=usecols,
squeeze=squeeze,
prefix=prefix,
mangle_dupe_cols=mangle_dupe_cols,
dtype=dtype,
engine=engine,
converters=converters,
true_values=true_values,
false_values=false_values,
skipinitialspace=skipinitialspace,
skiprows=skiprows,
nrows=nrows,
na_values=na_values,
keep_default_na=keep_default_na,
na_filter=na_filter,
verbose=verbose,
skip_blank_lines=skip_blank_lines,
parse_dates=parse_dates,
infer_datetime_format=infer_datetime_format,
keep_date_col=keep_date_col,
date_parser=date_parser,
dayfirst=dayfirst,
iterator=iterator,
chunksize=chunksize,
compression=compression,
thousands=thousands,
decimal=decimal,
lineterminator=lineterminator,
quotechar=quotechar,
quoting=quoting,
escapechar=escapechar,
comment=comment,
encoding=encoding,
dialect=dialect,
tupleize_cols=tupleize_cols,
error_bad_lines=error_bad_lines,
warn_bad_lines=warn_bad_lines,
skipfooter=skipfooter,
doublequote=doublequote,
delim_whitespace=delim_whitespace,
low_memory=low_memory,
memory_map=memory_map,
float_precision=float_precision)

if isinstance(filepath_or_buffer, str):
if not os.path.exists(filepath_or_buffer):
warnings.warn(("File not found on disk. "
"Defaulting to Pandas implementation."),
PendingDeprecationWarning)

return _read_csv_from_pandas(filepath_or_buffer, kwargs)
elif not isinstance(filepath_or_buffer, pathlib.Path) and \
not isinstance(filepath_or_buffer, py.path.local):
warnings.warn(("Reading from buffer. "
"Defaulting to Pandas implementation."),
PendingDeprecationWarning)

pd_obj = pd.read_csv(filepath_or_buffer, **kwargs)
if isinstance(pd_obj, pd.DataFrame):
return from_pandas(pd_obj, get_npartitions())
return _read_csv_from_pandas(filepath_or_buffer, kwargs)

return pd_obj
if _infer_compression(filepath_or_buffer, compression) is not None:
warnings.warn(("Compression detected. "
"Defaulting to Pandas implementation."),
PendingDeprecationWarning)

filepath = filepath_or_buffer
return _read_csv_from_pandas(filepath_or_buffer, kwargs)

# TODO: handle case where header is a list of lines
first_line = _get_firstline(filepath)
columns = _infer_column(first_line, kwargs=kwargs)
if header is None or (header == "infer" and names is not None):
first_line = b""
ignore_first_line = False
else:
ignore_first_line = True

offsets = _compute_offset(filepath, get_npartitions(),
ignore_first_line=ignore_first_line)

# Serialize objects to speed up later use in remote tasks
first_line_id = ray.put(first_line)
kwargs_id = ray.put(kwargs)

df_obj_ids = []
index_obj_ids = []
for start, end in offsets:
if start != 0:
df, index = _read_csv_with_offset._submit(
args=(filepath, start, end, kwargs_id, first_line_id),
num_return_vals=2)
else:
df, index = _read_csv_with_offset._submit(
args=(filepath, start, end, kwargs_id),
num_return_vals=2)
df_obj_ids.append(df)
index_obj_ids.append(index)
if chunksize is not None:
warnings.warn(("Reading chunks from a file. "
"Defaulting to Pandas implementation."),
PendingDeprecationWarning)

return _read_csv_from_pandas(filepath_or_buffer, kwargs)

if skiprows is not None and not isinstance(skiprows, int):
warnings.warn(("Defaulting to Pandas implementation. To speed up "
"read_csv through the Pandas on Ray implementation, "
"comment the rows to skip instead."))

index = get_index.remote(*index_obj_ids) if index_col is not None else None
return _read_csv_from_pandas(filepath_or_buffer, kwargs)

return DataFrame(row_partitions=df_obj_ids, columns=columns, index=index)
return _read_csv_from_file(filepath_or_buffer, get_npartitions(), kwargs)


def read_json(path_or_buf=None,
Expand Down
Loading