Skip to content

Commit 5e476ed

Browse files
authored
Use spark.write.csv in to_csv of Series and DataFrame (#749)
This PR proposes to use `spark.write.csv` API to enable distributed computation when `path` is specified. If `path` is not specified, it just calls pandas' `to_csv` as was. Closes #677
1 parent 7d25b56 commit 5e476ed

File tree

3 files changed

+197
-131
lines changed

3 files changed

+197
-131
lines changed

databricks/koalas/generic.py

+107-81
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
import warnings
2121
from collections import Counter
2222
from collections.abc import Iterable
23+
from distutils.version import LooseVersion
2324

2425
import numpy as np
2526
import pandas as pd
2627

2728
from pyspark import sql as spark
2829
from pyspark.sql import functions as F
30+
from pyspark.sql.readwriter import OptionUtils
2931
from pyspark.sql.types import DataType, DoubleType, FloatType
3032

3133
from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
@@ -456,118 +458,142 @@ def to_numpy(self):
456458
"""
457459
return self.to_pandas().values
458460

459-
def to_csv(self, path_or_buf=None, sep=",", na_rep='', float_format=None,
460-
columns=None, header=True, index=True, index_label=None,
461-
mode='w', encoding=None, compression='infer', quoting=None,
462-
quotechar='"', line_terminator="\n", chunksize=None,
463-
tupleize_cols=None, date_format=None, doublequote=True,
464-
escapechar=None, decimal='.'):
465-
"""
461+
def to_csv(self, path=None, sep=',', na_rep='', columns=None, header=True,
462+
quotechar='"', date_format=None, escapechar=None, num_files=None,
463+
**options):
464+
r"""
466465
Write object to a comma-separated values (csv) file.
467466
468-
.. note:: This method should only be used if the resulting CSV is expected
469-
to be small, as all the data is loaded into the driver's memory.
467+
.. note:: Koalas `to_csv` writes files to a path or URI. Unlike pandas', Koalas
468+
respects HDFS's property such as 'fs.default.name'.
469+
470+
.. note:: Koalas writes CSV files into the directory, `path`, and writes
471+
multiple `part-...` files in the directory when `path` is specified.
472+
This behaviour was inherited from Apache Spark. The number of files can
473+
be controlled by `num_files`.
470474
471475
Parameters
472476
----------
473-
path_or_buf : str or file handle, default None
474-
File path or object, if None is provided the result is returned as
475-
a string. If a file object is passed it should be opened with
476-
`newline=''`, disabling universal newlines.
477-
477+
path : str, default None
478+
File path. If None is provided the result is returned as a string.
478479
sep : str, default ','
479480
String of length 1. Field delimiter for the output file.
480481
na_rep : str, default ''
481482
Missing data representation.
482-
float_format : str, default None
483-
Format string for floating point numbers.
484483
columns : sequence, optional
485484
Columns to write.
486485
header : bool or list of str, default True
487486
Write out the column names. If a list of strings is given it is
488487
assumed to be aliases for the column names.
489-
index : bool, default True
490-
Write row names (index).
491-
index_label : str or sequence, or False, default None
492-
Column label for index column(s) if desired. If None is given, and
493-
`header` and `index` are True, then the index names are used. A
494-
sequence should be given if the object uses MultiIndex. If
495-
False do not print fields for index names. Use index_label=False
496-
for easier importing in R.
497-
mode : str
498-
Python write mode, default 'w'.
499-
encoding : str, optional
500-
A string representing the encoding to use in the output file,
501-
defaults to 'ascii' on Python 2 and 'utf-8' on Python 3.
502-
compression : str, default 'infer'
503-
Compression mode among the following possible values: {'infer',
504-
'gzip', 'bz2', 'zip', 'xz', None}. If 'infer' and `path_or_buf`
505-
is path-like, then detect compression from the following
506-
extensions: '.gz', '.bz2', '.zip' or '.xz'. (otherwise no
507-
compression).
508-
quoting : optional constant from csv module
509-
Defaults to csv.QUOTE_MINIMAL. If you have set a `float_format`
510-
then floats are converted to strings and thus csv.QUOTE_NONNUMERIC
511-
will treat them as non-numeric.
512488
quotechar : str, default '\"'
513489
String of length 1. Character used to quote fields.
514-
line_terminator : string, default '\\n'
515-
The newline character or character sequence to use in the output
516-
file. Defaults to `os.linesep`, which depends on the OS in which
517-
this method is called ('\n' for linux, '\r\n' for Windows, i.e.).
518-
chunksize : int or None
519-
Rows to write at a time.
520-
tupleize_cols : bool, default False
521-
Write MultiIndex columns as a list of tuples (if True) or in
522-
the new, expanded format, where each MultiIndex column is a row
523-
in the CSV (if False).
524490
date_format : str, default None
525491
Format string for datetime objects.
526-
doublequote : bool, default True
527-
Control quoting of `quotechar` inside a field.
528492
escapechar : str, default None
529493
String of length 1. Character used to escape `sep` and `quotechar`
530494
when appropriate.
531-
decimal : str, default '.'
532-
Character recognized as decimal separator. E.g. use ',' for
533-
European data.
534-
535-
Returns
536-
-------
537-
None or str
538-
If path_or_buf is None, returns the resulting csv format as a
539-
string. Otherwise returns None.
495+
num_files : the number of files to be written in `path` directory when
496+
this is a path.
497+
options: keyword arguments for additional options specific to PySpark.
498+
This kwargs are specific to PySpark's CSV options to pass. Check
499+
the options in PySpark's API documentation for spark.write.csv(...).
500+
It has higher priority and overwrites all other options.
501+
This parameter only works when `path` is specified.
540502
541503
See Also
542504
--------
543-
read_csv : Reading CSV files.
505+
read_csv
506+
DataFrame.to_delta
507+
DataFrame.to_table
508+
DataFrame.to_parquet
509+
DataFrame.to_spark_io
544510
545511
Examples
546512
--------
547-
>>> df = ks.DataFrame({'name': ['Raphael', 'Donatello'],
548-
... 'mask': ['red', 'purple'],
549-
... 'weapon': ['sai', 'bo staff']},
550-
... columns=['name', 'mask', 'weapon'])
551-
>>> df.to_csv(index=False)
552-
'name,mask,weapon\\nRaphael,red,sai\\nDonatello,purple,bo staff\\n'
553-
>>> df.name.to_csv() # doctest: +ELLIPSIS
554-
'...Raphael\\n1,Donatello\\n'
513+
>>> df = ks.DataFrame(dict(
514+
... date=list(pd.date_range('2012-1-1 12:00:00', periods=3, freq='M')),
515+
... country=['KR', 'US', 'JP'],
516+
... code=[1, 2 ,3]), columns=['date', 'country', 'code'])
517+
>>> df.sort_values(by="date") # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
518+
date country code
519+
... 2012-01-31 12:00:00 KR 1
520+
... 2012-02-29 12:00:00 US 2
521+
... 2012-03-31 12:00:00 JP 3
522+
523+
>>> print(df.to_csv()) # doctest: +NORMALIZE_WHITESPACE
524+
date,country,code
525+
2012-01-31 12:00:00,KR,1
526+
2012-02-29 12:00:00,US,2
527+
2012-03-31 12:00:00,JP,3
528+
529+
>>> df.to_csv(path=r'%s/to_csv/foo.csv' % path, num_files=1)
530+
>>> ks.read_csv(
531+
... path=r'%s/to_csv/foo.csv' % path
532+
... ).sort_values(by="date") # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
533+
date country code
534+
... 2012-01-31 12:00:00 KR 1
535+
... 2012-02-29 12:00:00 US 2
536+
... 2012-03-31 12:00:00 JP 3
537+
538+
In case of Series,
539+
540+
>>> print(df.date.to_csv()) # doctest: +NORMALIZE_WHITESPACE
541+
date
542+
2012-01-31 12:00:00
543+
2012-02-29 12:00:00
544+
2012-03-31 12:00:00
545+
546+
>>> df.date.to_csv(path=r'%s/to_csv/foo.csv' % path, num_files=1)
547+
>>> ks.read_csv(
548+
... path=r'%s/to_csv/foo.csv' % path
549+
... ).sort_values(by="date") # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
550+
date
551+
... 2012-01-31 12:00:00
552+
... 2012-02-29 12:00:00
553+
... 2012-03-31 12:00:00
555554
"""
555+
if path is None:
556+
# If path is none, just collect and use pandas's to_csv.
557+
kdf_or_ser = self
558+
if (LooseVersion("0.24") > LooseVersion(pd.__version__)) and \
559+
isinstance(self, ks.Series):
560+
# 0.23 seems not having 'columns' parameter in Series' to_csv.
561+
return kdf_or_ser.to_pandas().to_csv(
562+
None, sep=sep, na_rep=na_rep, header=header,
563+
date_format=date_format, index=False)
564+
else:
565+
return kdf_or_ser.to_pandas().to_csv(
566+
None, sep=sep, na_rep=na_rep, columns=columns,
567+
header=header, quotechar=quotechar,
568+
date_format=date_format, escapechar=escapechar, index=False)
569+
570+
if columns is not None:
571+
data_columns = columns
572+
else:
573+
data_columns = self._internal.data_columns
556574

557-
# Make sure locals() call is at the top of the function so we don't capture local variables.
558-
args = locals()
559575
kdf = self
560-
561-
if isinstance(self, ks.DataFrame):
562-
f = pd.DataFrame.to_csv
563-
elif isinstance(self, ks.Series):
564-
f = pd.Series.to_csv
576+
if isinstance(self, ks.Series):
577+
kdf = self._kdf
578+
579+
if isinstance(header, list):
580+
sdf = kdf._sdf.select(
581+
[self._internal.scol_for(old_name).alias(new_name)
582+
for (old_name, new_name) in zip(data_columns, header)])
583+
header = True
565584
else:
566-
raise TypeError('Constructor expects DataFrame or Series; however, '
567-
'got [%s]' % (self,))
568-
569-
return validate_arguments_and_invoke_function(
570-
kdf._to_internal_pandas(), self.to_csv, f, args)
585+
sdf = kdf._sdf.select(data_columns)
586+
587+
if num_files is not None:
588+
sdf = sdf.repartition(num_files)
589+
590+
builder = sdf.write.mode("overwrite")
591+
OptionUtils._set_opts(
592+
builder,
593+
path=path, sep=sep, nullValue=na_rep, header=header,
594+
quote=quotechar, dateFormat=date_format,
595+
charToEscapeQuoteEscaping=escapechar)
596+
builder.options(**options).format("csv").save(path)
571597

572598
def to_json(self, path_or_buf=None, orient=None, date_format=None,
573599
double_precision=10, force_ascii=True, date_unit='ms',

0 commit comments

Comments
 (0)