Skip to content

Commit 361ac19

Browse files
committed
Use spark.write.csv in to_csv of Series and DataFrame
1 parent 3a3f210 commit 361ac19

File tree

4 files changed

+161
-155
lines changed

4 files changed

+161
-155
lines changed

databricks/koalas/generic.py

+83-31
Original file line numberDiff line numberDiff line change
@@ -456,21 +456,23 @@ def to_numpy(self):
456456
"""
457457
return self.to_pandas().values
458458

459-
def to_csv(self, path_or_buf=None, sep=',', na_rep='', columns=None, header=True,
460-
index=True, encoding=None, quotechar='"', date_format=None, escapechar=None):
459+
def to_csv(self, path=None, sep=',', na_rep='', columns=None, header=True,
460+
quotechar='"', date_format=None, escapechar=None, num_files=None):
461461
r"""
462462
Write object to a comma-separated values (csv) file.
463463
464-
.. note:: Spark writes files to HDFS by default.
465-
If you want to save the file locally, you need to use path like below
466-
`'files:/' + local paths` like 'files:/work/data.csv'. Otherwise,
467-
you will write the file to the HDFS path where the spark program starts.
464+
.. note:: Koalas `to_csv` writes files to a path or URI. Unlike pandas', Koalas
465+
respects HDFS's property such as 'fs.default.name'.
466+
467+
.. note:: Koalas writes CSV files into the directory, `path`, and writes
468+
multiple `part-...` files in the directoy when `path` is specified.
469+
This behaviour was inherited from Apache Spark. The number of files can
470+
be controlled by `num_files`.
468471
469472
Parameters
470473
----------
471-
path_or_buf : str or file handle, default None
472-
File path or object, if None is provided the result is returned as
473-
a string.
474+
path : str, default None
475+
File path. If None is provided the result is returned as a string.
474476
sep : str, default ','
475477
String of length 1. Field delimiter for the output file.
476478
na_rep : str, default ''
@@ -480,18 +482,15 @@ def to_csv(self, path_or_buf=None, sep=',', na_rep='', columns=None, header=True
480482
header : bool or list of str, default True
481483
Write out the column names. If a list of strings is given it is
482484
assumed to be aliases for the column names.
483-
index : bool, default True
484-
Write row names (index).
485-
encoding : str, optional
486-
A string representing the encoding to use in the output file,
487-
defaults to 'utf-8'.
488485
quotechar : str, default '\"'
489486
String of length 1. Character used to quote fields.
490487
date_format : str, default None
491488
Format string for datetime objects.
492489
escapechar : str, default None
493490
String of length 1. Character used to escape `sep` and `quotechar`
494491
when appropriate.
492+
num_files : the number of files to be written in `path` directory when
493+
this is a path.
495494
496495
See Also
497496
--------
@@ -500,40 +499,93 @@ def to_csv(self, path_or_buf=None, sep=',', na_rep='', columns=None, header=True
500499
DataFrame.to_table
501500
DataFrame.to_parquet
502501
DataFrame.to_spark_io
502+
503503
Examples
504504
--------
505505
>>> df = ks.DataFrame(dict(
506506
... date=list(pd.date_range('2012-1-1 12:00:00', periods=3, freq='M')),
507507
... country=['KR', 'US', 'JP'],
508508
... code=[1, 2 ,3]), columns=['date', 'country', 'code'])
509-
>>> df
510-
date country code
511-
0 2012-01-31 12:00:00 KR 1
512-
1 2012-02-29 12:00:00 US 2
513-
2 2012-03-31 12:00:00 JP 3
514-
>>> df.to_csv(path=r'%s/to_csv/foo.csv' % path)
509+
>>> df.sort_values(by="date") # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
510+
date country code
511+
... 2012-01-31 12:00:00 KR 1
512+
... 2012-02-29 12:00:00 US 2
513+
... 2012-03-31 12:00:00 JP 3
514+
515+
>>> print(df.to_csv()) # doctest: +NORMALIZE_WHITESPACE
516+
date,country,code
517+
2012-01-31 12:00:00,KR,1
518+
2012-02-29 12:00:00,US,2
519+
2012-03-31 12:00:00,JP,3
520+
521+
>>> df.to_csv(path=r'%s/to_csv/foo.csv' % path, num_files=1)
522+
>>> ks.read_csv(
523+
... path=r'%s/to_csv/foo.csv' % path
524+
... ).sort_values(by="date") # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
525+
date country code
526+
... 2012-01-31 12:00:00 KR 1
527+
... 2012-02-29 12:00:00 US 2
528+
... 2012-03-31 12:00:00 JP 3
529+
530+
In case of Series,
531+
532+
>>> print(df.date.to_csv()) # doctest: +NORMALIZE_WHITESPACE
533+
date
534+
2012-01-31 12:00:00
535+
2012-02-29 12:00:00
536+
2012-03-31 12:00:00
537+
538+
>>> df.date.to_csv(path=r'%s/to_csv/foo.csv' % path, num_files=1)
539+
>>> ks.read_csv(
540+
... path=r'%s/to_csv/foo.csv' % path
541+
... ).sort_values(by="date") # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
542+
date
543+
... 2012-01-31 12:00:00
544+
... 2012-02-29 12:00:00
545+
... 2012-03-31 12:00:00
515546
"""
547+
if path is None:
548+
# If path is none, just collect and use pandas's to_csv.
549+
kdf_or_ser = self
550+
if isinstance(self, ks.DataFrame):
551+
return kdf_or_ser.to_pandas().to_csv(
552+
None, sep=sep, na_rep=na_rep, columns=columns,
553+
header=header, quotechar=quotechar,
554+
date_format=date_format, escapechar=escapechar, index=False)
555+
elif isinstance(self, ks.Series):
556+
# 0.23 seems not having 'columns' parameter in Series' to_csv.
557+
return kdf_or_ser.to_pandas().to_csv(
558+
None, sep=sep, na_rep=na_rep,
559+
header=header, quotechar=quotechar,
560+
date_format=date_format, escapechar=escapechar, index=False)
561+
else:
562+
raise TypeError('Constructor expects DataFrame or Series; however, '
563+
'got [%s]' % (self,))
564+
516565
if columns is not None:
517566
data_columns = columns
518567
else:
519568
data_columns = self._internal.data_columns
520569

521-
if index:
522-
index_columns = self._internal.index_columns
523-
else:
524-
index_columns = []
570+
kdf = self
571+
if isinstance(self, ks.Series):
572+
kdf = self._kdf
525573

526574
if isinstance(header, list):
527-
sdf = self._sdf.select(index_columns +
528-
[self._internal.scol_for(old_name).alias(new_name)
529-
for (old_name, new_name) in zip(data_columns, header)])
575+
sdf = kdf._sdf.select(
576+
[self._internal.scol_for(old_name).alias(new_name)
577+
for (old_name, new_name) in zip(data_columns, header)])
530578
header = True
531579
else:
532-
sdf = self._sdf.select(index_columns + data_columns)
580+
sdf = kdf._sdf.select(data_columns)
581+
582+
if num_files is not None:
583+
sdf = sdf.repartition(num_files)
533584

534-
sdf.write.csv(path=path_or_buf, sep=sep, nullValue=na_rep, header=header,
535-
encoding=encoding, quote=quotechar, dateFormat=date_format,
536-
charToEscapeQuoteEscaping=escapechar)
585+
sdf.write.mode("overwrite").csv(
586+
path=path, sep=sep, nullValue=na_rep, header=header,
587+
quote=quotechar, dateFormat=date_format,
588+
charToEscapeQuoteEscaping=escapechar)
537589

538590
def to_json(self, path_or_buf=None, orient=None, date_format=None,
539591
double_precision=10, force_ascii=True, date_unit='ms',

databricks/koalas/series.py

-79
Original file line numberDiff line numberDiff line change
@@ -1016,85 +1016,6 @@ def reset_index(self, level=None, drop=False, name=None, inplace=False):
10161016
else:
10171017
return kdf
10181018

1019-
def to_csv(self, path_or_buf=None, sep=',', na_rep='', columns=None, header=True,
1020-
index=True, encoding=None, quotechar='"', date_format=None, escapechar=None):
1021-
r"""
1022-
Write object to a comma-separated values (csv) file.
1023-
1024-
.. note:: Spark writes files to HDFS by default.
1025-
If you want to save the file locally, you need to use path like below
1026-
`'files:/' + local paths` like 'files:/work/data.csv'. Otherwise,
1027-
you will write the file to the HDFS path where the spark program starts.
1028-
1029-
Parameters
1030-
----------
1031-
path_or_buf : str or file handle, default None
1032-
File path or object, if None is provided the result is returned as
1033-
a string.
1034-
sep : str, default ','
1035-
String of length 1. Field delimiter for the output file.
1036-
na_rep : str, default ''
1037-
Missing data representation.
1038-
columns : sequence, optional
1039-
Columns to write.
1040-
header : bool or list of str, default True
1041-
Write out the column names. If a list of strings is given it is
1042-
assumed to be aliases for the column names.
1043-
index : bool, default True
1044-
Write row names (index).
1045-
encoding : str, optional
1046-
A string representing the encoding to use in the output file,
1047-
defaults to 'utf-8'.
1048-
quotechar : str, default '\"'
1049-
String of length 1. Character used to quote fields.
1050-
date_format : str, default None
1051-
Format string for datetime objects.
1052-
escapechar : str, default None
1053-
String of length 1. Character used to escape `sep` and `quotechar`
1054-
when appropriate.
1055-
1056-
See Also
1057-
--------
1058-
read_csv
1059-
DataFrame.to_delta
1060-
DataFrame.to_table
1061-
DataFrame.to_parquet
1062-
DataFrame.to_spark_io
1063-
Examples
1064-
--------
1065-
>>> df = ks.DataFrame(dict(
1066-
... date=list(pd.date_range('2012-1-1 12:00:00', periods=3, freq='M')),
1067-
... country=['KR', 'US', 'JP'],
1068-
... code=[1, 2 ,3]), columns=['date', 'country', 'code'])
1069-
>>> df
1070-
date country code
1071-
0 2012-01-31 12:00:00 KR 1
1072-
1 2012-02-29 12:00:00 US 2
1073-
2 2012-03-31 12:00:00 JP 3
1074-
>>> df.to_csv(path=r'%s/to_csv/foo.csv' % path)
1075-
"""
1076-
if columns is not None:
1077-
data_columns = columns
1078-
else:
1079-
data_columns = self._internal.data_columns
1080-
1081-
if index:
1082-
index_columns = self._internal.index_columns
1083-
else:
1084-
index_columns = []
1085-
1086-
if isinstance(header, list):
1087-
sdf = self._sdf.select(index_columns +
1088-
[self._internal.scol_for(old_name).alias(new_name)
1089-
for (old_name, new_name) in zip(data_columns, header)])
1090-
header = True
1091-
else:
1092-
sdf = self._sdf.select(index_columns + data_columns)
1093-
1094-
sdf.write.csv(path=path_or_buf, sep=sep, nullValue=na_rep, header=header,
1095-
encoding=encoding, quote=quotechar, dateFormat=date_format,
1096-
charToEscapeQuoteEscaping=escapechar)
1097-
10981019
def to_frame(self, name=None) -> spark.DataFrame:
10991020
"""
11001021
Convert Series to DataFrame.

0 commit comments

Comments
 (0)