Skip to content

Commit b41a9c3

Browse files
committed
Use spark.write.csv in to_csv of Series and DataFrame
1 parent 28f29da commit b41a9c3

File tree

4 files changed

+178
-155
lines changed

4 files changed

+178
-155
lines changed

databricks/koalas/generic.py

+90-31
Original file line numberDiff line numberDiff line change
@@ -456,21 +456,24 @@ def to_numpy(self):
456456
"""
457457
return self.to_pandas().values
458458

459-
def to_csv(self, path_or_buf=None, sep=',', na_rep='', columns=None, header=True,
460-
index=True, encoding=None, quotechar='"', date_format=None, escapechar=None):
459+
def to_csv(self, path=None, sep=',', na_rep='', columns=None, header=True,
460+
quotechar='"', date_format=None, escapechar=None, num_files=None,
461+
**kwargs):
461462
r"""
462463
Write object to a comma-separated values (csv) file.
463464
464-
.. note:: Spark writes files to HDFS by default.
465-
If you want to save the file locally, you need to use path like below
466-
`'files:/' + local paths` like 'files:/work/data.csv'. Otherwise,
467-
you will write the file to the HDFS path where the spark program starts.
465+
.. note:: Koalas `to_csv` writes files to a path or URI. Unlike pandas', Koalas
466+
respects HDFS's property such as 'fs.default.name'.
467+
468+
.. note:: Koalas writes CSV files into the directory, `path`, and writes
469+
multiple `part-...` files in the directory when `path` is specified.
470+
This behaviour was inherited from Apache Spark. The number of files can
471+
be controlled by `num_files`.
468472
469473
Parameters
470474
----------
471-
path_or_buf : str or file handle, default None
472-
File path or object, if None is provided the result is returned as
473-
a string.
475+
path : str, default None
476+
File path. If None is provided the result is returned as a string.
474477
sep : str, default ','
475478
String of length 1. Field delimiter for the output file.
476479
na_rep : str, default ''
@@ -480,18 +483,20 @@ def to_csv(self, path_or_buf=None, sep=',', na_rep='', columns=None, header=True
480483
header : bool or list of str, default True
481484
Write out the column names. If a list of strings is given it is
482485
assumed to be aliases for the column names.
483-
index : bool, default True
484-
Write row names (index).
485-
encoding : str, optional
486-
A string representing the encoding to use in the output file,
487-
defaults to 'utf-8'.
488486
quotechar : str, default '\"'
489487
String of length 1. Character used to quote fields.
490488
date_format : str, default None
491489
Format string for datetime objects.
492490
escapechar : str, default None
493491
String of length 1. Character used to escape `sep` and `quotechar`
494492
when appropriate.
493+
num_files : the number of files to be written in `path` directory when
494+
this is a path.
495+
kwargs: keyword arguments for additional options specific to PySpark.
496+
This kwargs are specific to PySpark's CSV options to pass. Check
497+
the options in PySpark's API documentation for spark.write.csv(...).
498+
It has higher priority and overwrites all other options.
499+
This parameter only works when `path` is specified.
495500
496501
See Also
497502
--------
@@ -500,40 +505,94 @@ def to_csv(self, path_or_buf=None, sep=',', na_rep='', columns=None, header=True
500505
DataFrame.to_table
501506
DataFrame.to_parquet
502507
DataFrame.to_spark_io
508+
503509
Examples
504510
--------
505511
>>> df = ks.DataFrame(dict(
506512
... date=list(pd.date_range('2012-1-1 12:00:00', periods=3, freq='M')),
507513
... country=['KR', 'US', 'JP'],
508514
... code=[1, 2 ,3]), columns=['date', 'country', 'code'])
509-
>>> df
510-
date country code
511-
0 2012-01-31 12:00:00 KR 1
512-
1 2012-02-29 12:00:00 US 2
513-
2 2012-03-31 12:00:00 JP 3
514-
>>> df.to_csv(path=r'%s/to_csv/foo.csv' % path)
515+
>>> df.sort_values(by="date") # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
516+
date country code
517+
... 2012-01-31 12:00:00 KR 1
518+
... 2012-02-29 12:00:00 US 2
519+
... 2012-03-31 12:00:00 JP 3
520+
521+
>>> print(df.to_csv()) # doctest: +NORMALIZE_WHITESPACE
522+
date,country,code
523+
2012-01-31 12:00:00,KR,1
524+
2012-02-29 12:00:00,US,2
525+
2012-03-31 12:00:00,JP,3
526+
527+
>>> df.to_csv(path=r'%s/to_csv/foo.csv' % path, num_files=1)
528+
>>> ks.read_csv(
529+
... path=r'%s/to_csv/foo.csv' % path
530+
... ).sort_values(by="date") # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
531+
date country code
532+
... 2012-01-31 12:00:00 KR 1
533+
... 2012-02-29 12:00:00 US 2
534+
... 2012-03-31 12:00:00 JP 3
535+
536+
In case of Series,
537+
538+
>>> print(df.date.to_csv()) # doctest: +NORMALIZE_WHITESPACE
539+
date
540+
2012-01-31 12:00:00
541+
2012-02-29 12:00:00
542+
2012-03-31 12:00:00
543+
544+
>>> df.date.to_csv(path=r'%s/to_csv/foo.csv' % path, num_files=1)
545+
>>> ks.read_csv(
546+
... path=r'%s/to_csv/foo.csv' % path
547+
... ).sort_values(by="date") # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
548+
date
549+
... 2012-01-31 12:00:00
550+
... 2012-02-29 12:00:00
551+
... 2012-03-31 12:00:00
515552
"""
553+
if path is None:
554+
# If path is none, just collect and use pandas's to_csv.
555+
kdf_or_ser = self
556+
if isinstance(self, ks.DataFrame):
557+
return kdf_or_ser.to_pandas().to_csv(
558+
None, sep=sep, na_rep=na_rep, columns=columns,
559+
header=header, quotechar=quotechar,
560+
date_format=date_format, escapechar=escapechar, index=False)
561+
elif isinstance(self, ks.Series):
562+
# 0.23 seems not having 'columns' parameter in Series' to_csv.
563+
return kdf_or_ser.to_pandas().to_csv(
564+
None, sep=sep, na_rep=na_rep,
565+
header=header, quotechar=quotechar,
566+
date_format=date_format, escapechar=escapechar, index=False)
567+
else:
568+
raise TypeError('Constructor expects DataFrame or Series; however, '
569+
'got [%s]' % (self,))
570+
516571
if columns is not None:
517572
data_columns = columns
518573
else:
519574
data_columns = self._internal.data_columns
520575

521-
if index:
522-
index_columns = self._internal.index_columns
523-
else:
524-
index_columns = []
576+
kdf = self
577+
if isinstance(self, ks.Series):
578+
kdf = self._kdf
525579

526580
if isinstance(header, list):
527-
sdf = self._sdf.select(index_columns +
528-
[self._internal.scol_for(old_name).alias(new_name)
529-
for (old_name, new_name) in zip(data_columns, header)])
581+
sdf = kdf._sdf.select(
582+
[self._internal.scol_for(old_name).alias(new_name)
583+
for (old_name, new_name) in zip(data_columns, header)])
530584
header = True
531585
else:
532-
sdf = self._sdf.select(index_columns + data_columns)
586+
sdf = kdf._sdf.select(data_columns)
587+
588+
if num_files is not None:
589+
sdf = sdf.repartition(num_files)
533590

534-
sdf.write.csv(path=path_or_buf, sep=sep, nullValue=na_rep, header=header,
535-
encoding=encoding, quote=quotechar, dateFormat=date_format,
536-
charToEscapeQuoteEscaping=escapechar)
591+
builder = sdf.write.mode("overwrite").options(
592+
path=path, sep=sep, nullValue=na_rep, header=header,
593+
quote=quotechar, dateFormat=date_format,
594+
charToEscapeQuoteEscaping=escapechar)
595+
builder.options(**kwargs).format("csv").save(path)
537596

538597
def to_json(self, path_or_buf=None, orient=None, date_format=None,
539598
double_precision=10, force_ascii=True, date_unit='ms',

databricks/koalas/series.py

-79
Original file line numberDiff line numberDiff line change
@@ -1016,85 +1016,6 @@ def reset_index(self, level=None, drop=False, name=None, inplace=False):
10161016
else:
10171017
return kdf
10181018

1019-
def to_csv(self, path_or_buf=None, sep=',', na_rep='', columns=None, header=True,
1020-
index=True, encoding=None, quotechar='"', date_format=None, escapechar=None):
1021-
r"""
1022-
Write object to a comma-separated values (csv) file.
1023-
1024-
.. note:: Spark writes files to HDFS by default.
1025-
If you want to save the file locally, you need to use path like below
1026-
`'files:/' + local paths` like 'files:/work/data.csv'. Otherwise,
1027-
you will write the file to the HDFS path where the spark program starts.
1028-
1029-
Parameters
1030-
----------
1031-
path_or_buf : str or file handle, default None
1032-
File path or object, if None is provided the result is returned as
1033-
a string.
1034-
sep : str, default ','
1035-
String of length 1. Field delimiter for the output file.
1036-
na_rep : str, default ''
1037-
Missing data representation.
1038-
columns : sequence, optional
1039-
Columns to write.
1040-
header : bool or list of str, default True
1041-
Write out the column names. If a list of strings is given it is
1042-
assumed to be aliases for the column names.
1043-
index : bool, default True
1044-
Write row names (index).
1045-
encoding : str, optional
1046-
A string representing the encoding to use in the output file,
1047-
defaults to 'utf-8'.
1048-
quotechar : str, default '\"'
1049-
String of length 1. Character used to quote fields.
1050-
date_format : str, default None
1051-
Format string for datetime objects.
1052-
escapechar : str, default None
1053-
String of length 1. Character used to escape `sep` and `quotechar`
1054-
when appropriate.
1055-
1056-
See Also
1057-
--------
1058-
read_csv
1059-
DataFrame.to_delta
1060-
DataFrame.to_table
1061-
DataFrame.to_parquet
1062-
DataFrame.to_spark_io
1063-
Examples
1064-
--------
1065-
>>> df = ks.DataFrame(dict(
1066-
... date=list(pd.date_range('2012-1-1 12:00:00', periods=3, freq='M')),
1067-
... country=['KR', 'US', 'JP'],
1068-
... code=[1, 2 ,3]), columns=['date', 'country', 'code'])
1069-
>>> df
1070-
date country code
1071-
0 2012-01-31 12:00:00 KR 1
1072-
1 2012-02-29 12:00:00 US 2
1073-
2 2012-03-31 12:00:00 JP 3
1074-
>>> df.to_csv(path=r'%s/to_csv/foo.csv' % path)
1075-
"""
1076-
if columns is not None:
1077-
data_columns = columns
1078-
else:
1079-
data_columns = self._internal.data_columns
1080-
1081-
if index:
1082-
index_columns = self._internal.index_columns
1083-
else:
1084-
index_columns = []
1085-
1086-
if isinstance(header, list):
1087-
sdf = self._sdf.select(index_columns +
1088-
[self._internal.scol_for(old_name).alias(new_name)
1089-
for (old_name, new_name) in zip(data_columns, header)])
1090-
header = True
1091-
else:
1092-
sdf = self._sdf.select(index_columns + data_columns)
1093-
1094-
sdf.write.csv(path=path_or_buf, sep=sep, nullValue=na_rep, header=header,
1095-
encoding=encoding, quote=quotechar, dateFormat=date_format,
1096-
charToEscapeQuoteEscaping=escapechar)
1097-
10981019
def to_frame(self, name=None) -> spark.DataFrame:
10991020
"""
11001021
Convert Series to DataFrame.

0 commit comments

Comments
 (0)