diff --git a/modin/dataframe/io.py b/modin/dataframe/io.py index 8b1bdbb4eef..4beca1c3e0d 100644 --- a/modin/dataframe/io.py +++ b/modin/dataframe/io.py @@ -5,6 +5,8 @@ from itertools import chain from io import BytesIO import os +import pathlib +import py import re import warnings import pandas @@ -84,45 +86,104 @@ def _split_df(pandas_df, chunksize): # CSV -def _compute_offset(fn, npartitions, ignore_first_line=False): - """ - Calculate the currect bytes offsets for a csv file. - Return a list of (start, end) tuple where the end == \n or EOF. - """ - total_bytes = os.path.getsize(fn) - bio = open(fn, 'rb') - if ignore_first_line: - start = len(bio.readline()) - chunksize = (total_bytes - start) // npartitions - else: - start = 0 - chunksize = total_bytes // npartitions - if chunksize == 0: - chunksize = 1 - - offsets = [] - while start < total_bytes: - bio.seek(chunksize, 1) # Move forward {chunksize} bytes - extend_line = bio.readline() # Move after the next \n - total_offset = chunksize + len(extend_line) - # The position of the \n we just crossed. - new_line_cursor = start + total_offset - 1 - offsets.append((start, new_line_cursor)) - start = new_line_cursor + 1 - - bio.close() - return offsets - - -def _get_firstline(file_path): - bio = open(file_path, 'rb') - first = bio.readline() - bio.close() - return first +def _skip_header(fp, kwargs={}): + lines_read = 0 + comment = kwargs["comment"] + skiprows = kwargs["skiprows"] + encoding = kwargs["encoding"] + header = kwargs["header"] + names = kwargs["names"] + + if header is None: + return lines_read + + if header == "infer": + if names is not None: + return lines_read + else: + header = 0 + + # Skip lines before the header + if isinstance(skiprows, int): + lines_read += skiprows + for _ in range(skiprows): + fp.readline() + skiprows = None + + header_lines = header + 1 if isinstance(header, int) else max(header) + 1 + + header_lines_skipped = 0 + for line in fp: + lines_read += 1 + skip = False + if not skip and comment is not None: + if encoding is not None: + skip |= line.decode(encoding)[0] == comment + else: + skip |= line.decode()[0] == comment + if not skip and callable(skiprows): + skip |= skiprows(lines_read) + elif not skip and hasattr(skiprows, "__contains__"): + skip |= lines_read in skiprows + + if not skip: + header_lines_skipped += 1 + if header_lines_skipped == header_lines: + return lines_read + + return lines_read + + +def _read_csv_from_file(filepath, npartitions, kwargs={}): + """Constructs a FataFrame from a CSV file. + Args: + filepath (str): path to the CSV file. + npartitions (int): number of partitions for the DataFrame. + kwargs (dict): args excluding filepath provided to read_csv. -def _infer_column(first_line, kwargs={}): - return pandas.read_csv(BytesIO(first_line), **kwargs).columns + Returns: + DataFrame constructed from CSV file. + """ + empty_pd_df = pandas.read_csv(filepath, **dict(kwargs, nrows=0)) + names = empty_pd_df.columns + partition_kwargs = dict(kwargs, header=None, names=names) + with open(filepath, "rb") as f: + # Get the BOM if necessary + prefix = b"" + if kwargs["encoding"] is not None: + prefix = f.readline() + partition_kwargs["skiprows"] = 1 + f.seek(0, os.SEEK_SET) # Return to beginning of file + + prefix_id = ray.put(prefix) + partition_kwargs_id = ray.put(partition_kwargs) + + # Skip the header since we already have the header information + _skip_header(f, kwargs) + + # Launch tasks to read partitions + partition_ids = [] + index_ids = [] + total_bytes = os.path.getsize(filepath) + chunk_size = max(1, (total_bytes - f.tell()) // npartitions) + while f.tell() < total_bytes: + start = f.tell() + f.seek(chunk_size, os.SEEK_CUR) + f.readline() # Read a whole number of lines + partition_id, index_id = _read_csv_with_offset._submit( + args=(filepath, start, f.tell(), partition_kwargs_id, + prefix_id), + num_return_vals=2) + partition_ids.append(partition_id) + index_ids.append(index_id) + + # Construct index + index_id = get_index.remote([empty_pd_df.index.name], *index_ids) \ + if kwargs["index_col"] is not None else None + + return DataFrame(row_partitions=partition_ids, columns=names, + index=index_id) @ray.remote @@ -140,9 +201,26 @@ def _read_csv_with_offset(fn, start, end, kwargs={}, header=b''): return pandas_df, index +def _read_csv_from_pandas(filepath_or_buffer, kwargs): + pd_obj = pandas.read_csv(filepath_or_buffer, **kwargs) + + if isinstance(pd_obj, pandas.DataFrame): + return from_pandas(pd_obj, get_npartitions()) + elif isinstance(pd_obj, pandas.io.parsers.TextFileReader): + # Overwriting the read method should return a ray DataFrame for calls + # to __next__ and get_chunk + pd_read = pd_obj.read + pd_obj.read = lambda *args, **kwargs: \ + from_pandas(pd_read(*args, **kwargs), get_npartitions()) + + return pd_obj + + @ray.remote -def get_index(*partition_indices): - return partition_indices[0].append(partition_indices[1:]) +def get_index(index_name, *partition_indices): + index = partition_indices[0].append(partition_indices[1:]) + index.names = index_name + return index def read_csv(filepath_or_buffer, @@ -189,14 +267,9 @@ def read_csv(filepath_or_buffer, error_bad_lines=True, warn_bad_lines=True, skipfooter=0, - skip_footer=0, doublequote=True, delim_whitespace=False, - as_recarray=None, - compact_ints=None, - use_unsigned=None, low_memory=True, - buffer_lines=None, memory_map=False, float_precision=None): """Read csv file from local disk. @@ -208,110 +281,93 @@ def read_csv(filepath_or_buffer, kwargs: Keyword arguments in pandas::from_csv """ - kwargs = { - 'sep': sep, - 'delimiter': delimiter, - 'header': header, - 'names': names, - 'index_col': index_col, - 'usecols': usecols, - 'squeeze': squeeze, - 'prefix': prefix, - 'mangle_dupe_cols': mangle_dupe_cols, - 'dtype': dtype, - 'engine': engine, - 'converters': converters, - 'true_values': true_values, - 'false_values': false_values, - 'skipinitialspace': skipinitialspace, - 'skiprows': skiprows, - 'nrows': nrows, - 'na_values': na_values, - 'keep_default_na': keep_default_na, - 'na_filter': na_filter, - 'verbose': verbose, - 'skip_blank_lines': skip_blank_lines, - 'parse_dates': parse_dates, - 'infer_datetime_format': infer_datetime_format, - 'keep_date_col': keep_date_col, - 'date_parser': date_parser, - 'dayfirst': dayfirst, - 'iterator': iterator, - 'chunksize': chunksize, - 'compression': compression, - 'thousands': thousands, - 'decimal': decimal, - 'lineterminator': lineterminator, - 'quotechar': quotechar, - 'quoting': quoting, - 'escapechar': escapechar, - 'comment': comment, - 'encoding': encoding, - 'dialect': dialect, - 'tupleize_cols': tupleize_cols, - 'error_bad_lines': error_bad_lines, - 'warn_bad_lines': warn_bad_lines, - 'skipfooter': skipfooter, - 'skip_footer': skip_footer, - 'doublequote': doublequote, - 'delim_whitespace': delim_whitespace, - 'as_recarray': as_recarray, - 'compact_ints': compact_ints, - 'use_unsigned': use_unsigned, - 'low_memory': low_memory, - 'buffer_lines': buffer_lines, - 'memory_map': memory_map, - 'float_precision': float_precision, - } - - # Default to Pandas read_csv for non-serializable objects - if not isinstance(filepath_or_buffer, str) or \ - _infer_compression(filepath_or_buffer, compression) is not None: - - warnings.warn("Defaulting to Pandas implementation", + kwargs = dict( + sep=sep, + delimiter=delimiter, + header=header, + names=names, + index_col=index_col, + usecols=usecols, + squeeze=squeeze, + prefix=prefix, + mangle_dupe_cols=mangle_dupe_cols, + dtype=dtype, + engine=engine, + converters=converters, + true_values=true_values, + false_values=false_values, + skipinitialspace=skipinitialspace, + skiprows=skiprows, + nrows=nrows, + na_values=na_values, + keep_default_na=keep_default_na, + na_filter=na_filter, + verbose=verbose, + skip_blank_lines=skip_blank_lines, + parse_dates=parse_dates, + infer_datetime_format=infer_datetime_format, + keep_date_col=keep_date_col, + date_parser=date_parser, + dayfirst=dayfirst, + iterator=iterator, + chunksize=chunksize, + compression=compression, + thousands=thousands, + decimal=decimal, + lineterminator=lineterminator, + quotechar=quotechar, + quoting=quoting, + escapechar=escapechar, + comment=comment, + encoding=encoding, + dialect=dialect, + tupleize_cols=tupleize_cols, + error_bad_lines=error_bad_lines, + warn_bad_lines=warn_bad_lines, + skipfooter=skipfooter, + doublequote=doublequote, + delim_whitespace=delim_whitespace, + low_memory=low_memory, + memory_map=memory_map, + float_precision=float_precision) + + if isinstance(filepath_or_buffer, str): + if not os.path.exists(filepath_or_buffer): + warnings.warn(("File not found on disk. " + "Defaulting to Pandas implementation."), + PendingDeprecationWarning) + + return _read_csv_from_pandas(filepath_or_buffer, kwargs) + elif not isinstance(filepath_or_buffer, pathlib.Path) and \ + not isinstance(filepath_or_buffer, py.path.local): + warnings.warn(("Reading from buffer. " + "Defaulting to Pandas implementation."), PendingDeprecationWarning) - pandas_obj = pandas.read_csv(filepath_or_buffer, **kwargs) - if isinstance(pandas_obj, pandas.DataFrame): - return from_pandas(pandas_obj, get_npartitions()) + return _read_csv_from_pandas(filepath_or_buffer, kwargs) - return pandas_obj + if _infer_compression(filepath_or_buffer, compression) is not None: + warnings.warn(("Compression detected. " + "Defaulting to Pandas implementation."), + PendingDeprecationWarning) - filepath = filepath_or_buffer + return _read_csv_from_pandas(filepath_or_buffer, kwargs) - # TODO: handle case where header is a list of lines - first_line = _get_firstline(filepath) - columns = _infer_column(first_line, kwargs=kwargs) - if header is None or (header == "infer" and names is not None): - first_line = b"" - ignore_first_line = False - else: - ignore_first_line = True - - offsets = _compute_offset(filepath, get_npartitions(), - ignore_first_line=ignore_first_line) - - # Serialize objects to speed up later use in remote tasks - first_line_id = ray.put(first_line) - kwargs_id = ray.put(kwargs) - - df_obj_ids = [] - index_obj_ids = [] - for start, end in offsets: - if start != 0: - df, index = _read_csv_with_offset._submit( - args=(filepath, start, end, kwargs_id, first_line_id), - num_return_vals=2) - else: - df, index = _read_csv_with_offset._submit( - args=(filepath, start, end, kwargs_id), - num_return_vals=2) - df_obj_ids.append(df) - index_obj_ids.append(index) + if chunksize is not None: + warnings.warn(("Reading chunks from a file. " + "Defaulting to Pandas implementation."), + PendingDeprecationWarning) + + return _read_csv_from_pandas(filepath_or_buffer, kwargs) + + if skiprows is not None and not isinstance(skiprows, int): + warnings.warn(("Defaulting to Pandas implementation. To speed up " + "read_csv through the Pandas on Ray implementation, " + "comment the rows to skip instead.")) - index = get_index.remote(*index_obj_ids) if index_col is not None else None + return _read_csv_from_pandas(filepath_or_buffer, kwargs) - return DataFrame(row_partitions=df_obj_ids, columns=columns, index=index) + return _read_csv_from_file(filepath_or_buffer, get_npartitions(), kwargs) def read_json(path_or_buf=None, diff --git a/modin/dataframe/test/test_io.py b/modin/dataframe/test/test_io.py index a2383a31c39..f59b705a194 100644 --- a/modin/dataframe/test/test_io.py +++ b/modin/dataframe/test/test_io.py @@ -312,6 +312,35 @@ def test_from_csv(): teardown_csv_file() +def test_from_csv_chunksize(): + setup_csv_file(SMALL_ROW_SIZE) + + # Tests __next__ and correctness of reader as an iterator + rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=1) + pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=1) + + for ray_df, pd_df in zip(rdf_reader, pd_reader): + assert ray_df_equals_pandas(ray_df, pd_df) + + # Tests that get_chunk works correctly + rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=1) + pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=1) + + ray_df = rdf_reader.get_chunk(1) + pd_df = pd_reader.get_chunk(1) + + assert ray_df_equals_pandas(ray_df, pd_df) + + # Tests that read works correctly + rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=1) + pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=1) + + ray_df = rdf_reader.read() + pd_df = pd_reader.read() + + assert ray_df_equals_pandas(ray_df, pd_df) + + def test_from_json(): setup_json_file(SMALL_ROW_SIZE)