Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use spark.write.csv in to_csv of Series and DataFrame #749

Merged
merged 2 commits into from
Sep 10, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 107 additions & 81 deletions databricks/koalas/generic.py
Original file line number Diff line number Diff line change
@@ -20,12 +20,14 @@
import warnings
from collections import Counter
from collections.abc import Iterable
from distutils.version import LooseVersion

import numpy as np
import pandas as pd

from pyspark import sql as spark
from pyspark.sql import functions as F
from pyspark.sql.readwriter import OptionUtils
from pyspark.sql.types import DataType, DoubleType, FloatType

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
@@ -456,118 +458,142 @@ def to_numpy(self):
"""
return self.to_pandas().values

def to_csv(self, path_or_buf=None, sep=",", na_rep='', float_format=None,
columns=None, header=True, index=True, index_label=None,
mode='w', encoding=None, compression='infer', quoting=None,
quotechar='"', line_terminator="\n", chunksize=None,
tupleize_cols=None, date_format=None, doublequote=True,
escapechar=None, decimal='.'):
"""
def to_csv(self, path=None, sep=',', na_rep='', columns=None, header=True,
quotechar='"', date_format=None, escapechar=None, num_files=None,
**options):
r"""
Write object to a comma-separated values (csv) file.

.. note:: This method should only be used if the resulting CSV is expected
to be small, as all the data is loaded into the driver's memory.
.. note:: Koalas `to_csv` writes files to a path or URI. Unlike pandas', Koalas
respects HDFS's property such as 'fs.default.name'.

.. note:: Koalas writes CSV files into the directory, `path`, and writes
multiple `part-...` files in the directory when `path` is specified.
This behaviour was inherited from Apache Spark. The number of files can
be controlled by `num_files`.

Parameters
----------
path_or_buf : str or file handle, default None
File path or object, if None is provided the result is returned as
a string. If a file object is passed it should be opened with
`newline=''`, disabling universal newlines.

path : str, default None
File path. If None is provided the result is returned as a string.
sep : str, default ','
String of length 1. Field delimiter for the output file.
na_rep : str, default ''
Missing data representation.
float_format : str, default None
Format string for floating point numbers.
columns : sequence, optional
Columns to write.
header : bool or list of str, default True
Write out the column names. If a list of strings is given it is
assumed to be aliases for the column names.
index : bool, default True
Write row names (index).
index_label : str or sequence, or False, default None
Column label for index column(s) if desired. If None is given, and
`header` and `index` are True, then the index names are used. A
sequence should be given if the object uses MultiIndex. If
False do not print fields for index names. Use index_label=False
for easier importing in R.
mode : str
Python write mode, default 'w'.
encoding : str, optional
A string representing the encoding to use in the output file,
defaults to 'ascii' on Python 2 and 'utf-8' on Python 3.
compression : str, default 'infer'
Compression mode among the following possible values: {'infer',
'gzip', 'bz2', 'zip', 'xz', None}. If 'infer' and `path_or_buf`
is path-like, then detect compression from the following
extensions: '.gz', '.bz2', '.zip' or '.xz'. (otherwise no
compression).
quoting : optional constant from csv module
Defaults to csv.QUOTE_MINIMAL. If you have set a `float_format`
then floats are converted to strings and thus csv.QUOTE_NONNUMERIC
will treat them as non-numeric.
quotechar : str, default '\"'
String of length 1. Character used to quote fields.
line_terminator : string, default '\\n'
The newline character or character sequence to use in the output
file. Defaults to `os.linesep`, which depends on the OS in which
this method is called ('\n' for linux, '\r\n' for Windows, i.e.).
chunksize : int or None
Rows to write at a time.
tupleize_cols : bool, default False
Write MultiIndex columns as a list of tuples (if True) or in
the new, expanded format, where each MultiIndex column is a row
in the CSV (if False).
date_format : str, default None
Format string for datetime objects.
doublequote : bool, default True
Control quoting of `quotechar` inside a field.
escapechar : str, default None
String of length 1. Character used to escape `sep` and `quotechar`
when appropriate.
decimal : str, default '.'
Character recognized as decimal separator. E.g. use ',' for
European data.

Returns
-------
None or str
If path_or_buf is None, returns the resulting csv format as a
string. Otherwise returns None.
num_files : the number of files to be written in `path` directory when
this is a path.
options: keyword arguments for additional options specific to PySpark.
This kwargs are specific to PySpark's CSV options to pass. Check
the options in PySpark's API documentation for spark.write.csv(...).
It has higher priority and overwrites all other options.
This parameter only works when `path` is specified.

See Also
--------
read_csv : Reading CSV files.
read_csv
DataFrame.to_delta
DataFrame.to_table
DataFrame.to_parquet
DataFrame.to_spark_io

Examples
--------
>>> df = ks.DataFrame({'name': ['Raphael', 'Donatello'],
... 'mask': ['red', 'purple'],
... 'weapon': ['sai', 'bo staff']},
... columns=['name', 'mask', 'weapon'])
>>> df.to_csv(index=False)
'name,mask,weapon\\nRaphael,red,sai\\nDonatello,purple,bo staff\\n'
>>> df.name.to_csv() # doctest: +ELLIPSIS
'...Raphael\\n1,Donatello\\n'
>>> df = ks.DataFrame(dict(
... date=list(pd.date_range('2012-1-1 12:00:00', periods=3, freq='M')),
... country=['KR', 'US', 'JP'],
... code=[1, 2 ,3]), columns=['date', 'country', 'code'])
>>> df.sort_values(by="date") # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+ELLIPSIS is not needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh that's needed for ... for index.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here:

Expected:
                       date
    ... 2012-01-31 12:00:00
    ... 2012-02-29 12:00:00
    ... 2012-03-31 12:00:00
Got:
                     date
    0 2012-01-31 12:00:00
    1 2012-02-29 12:00:00
    2 2012-03-31 12:00:00

date country code
... 2012-01-31 12:00:00 KR 1
... 2012-02-29 12:00:00 US 2
... 2012-03-31 12:00:00 JP 3

>>> print(df.to_csv()) # doctest: +NORMALIZE_WHITESPACE
date,country,code
2012-01-31 12:00:00,KR,1
2012-02-29 12:00:00,US,2
2012-03-31 12:00:00,JP,3

>>> df.to_csv(path=r'%s/to_csv/foo.csv' % path, num_files=1)
>>> ks.read_csv(
... path=r'%s/to_csv/foo.csv' % path
... ).sort_values(by="date") # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

date country code
... 2012-01-31 12:00:00 KR 1
... 2012-02-29 12:00:00 US 2
... 2012-03-31 12:00:00 JP 3

In case of Series,

>>> print(df.date.to_csv()) # doctest: +NORMALIZE_WHITESPACE
date
2012-01-31 12:00:00
2012-02-29 12:00:00
2012-03-31 12:00:00

>>> df.date.to_csv(path=r'%s/to_csv/foo.csv' % path, num_files=1)
>>> ks.read_csv(
... path=r'%s/to_csv/foo.csv' % path
... ).sort_values(by="date") # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

date
... 2012-01-31 12:00:00
... 2012-02-29 12:00:00
... 2012-03-31 12:00:00
"""
if path is None:
# If path is none, just collect and use pandas's to_csv.
kdf_or_ser = self
if (LooseVersion("0.24") > LooseVersion(pd.__version__)) and \
isinstance(self, ks.Series):
# 0.23 seems not having 'columns' parameter in Series' to_csv.
return kdf_or_ser.to_pandas().to_csv(
None, sep=sep, na_rep=na_rep, header=header,
date_format=date_format, index=False)
else:
return kdf_or_ser.to_pandas().to_csv(
None, sep=sep, na_rep=na_rep, columns=columns,
header=header, quotechar=quotechar,
date_format=date_format, escapechar=escapechar, index=False)

if columns is not None:
data_columns = columns
else:
data_columns = self._internal.data_columns

# Make sure locals() call is at the top of the function so we don't capture local variables.
args = locals()
kdf = self

if isinstance(self, ks.DataFrame):
f = pd.DataFrame.to_csv
elif isinstance(self, ks.Series):
f = pd.Series.to_csv
if isinstance(self, ks.Series):
kdf = self._kdf

if isinstance(header, list):
sdf = kdf._sdf.select(
[self._internal.scol_for(old_name).alias(new_name)
for (old_name, new_name) in zip(data_columns, header)])
header = True
else:
raise TypeError('Constructor expects DataFrame or Series; however, '
'got [%s]' % (self,))

return validate_arguments_and_invoke_function(
kdf._to_internal_pandas(), self.to_csv, f, args)
sdf = kdf._sdf.select(data_columns)

if num_files is not None:
sdf = sdf.repartition(num_files)

builder = sdf.write.mode("overwrite")
OptionUtils._set_opts(
builder,
path=path, sep=sep, nullValue=na_rep, header=header,
quote=quotechar, dateFormat=date_format,
charToEscapeQuoteEscaping=escapechar)
builder.options(**options).format("csv").save(path)

def to_json(self, path_or_buf=None, orient=None, date_format=None,
double_precision=10, force_ascii=True, date_unit='ms',
Loading