Skip to content

Commit

Permalink
Merge branch 'master' of github.com:databricks/koalas into feature/im…
Browse files Browse the repository at this point in the history
…pl-index_map
  • Loading branch information
awdavidson committed Apr 7, 2021
2 parents 694650a + 0fd088e commit f845001
Show file tree
Hide file tree
Showing 17 changed files with 686 additions and 78 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
**NOTE**: Koalas supports Apache Spark 3.1 and below as it will be [officially included to PySpark in the upcoming Apache Spark 3.2](http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-SPIP-Support-pandas-API-layer-on-PySpark-td30996.html). This repository is now in maintenance mode. For Apache Spark 3.2 and above, please use PySpark directly.

<p align="center">
<img src="https://raw.githubusercontent.com/databricks/koalas/master/icons/koalas-logo.png" width="140"/>
</p>
Expand Down
126 changes: 92 additions & 34 deletions databricks/koalas/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
"""
import inspect
from distutils.version import LooseVersion
from typing import Any, Tuple, Union, TYPE_CHECKING, cast
from typing import Any, Optional, Tuple, Union, TYPE_CHECKING, cast
import types

import numpy as np # noqa: F401
import pandas as pd
import pyspark
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType

from databricks.koalas.internal import (
InternalFrame,
SPARK_INDEX_NAME_FORMAT,
SPARK_DEFAULT_SERIES_NAME,
)
from databricks.koalas.typedef import infer_return_type, DataFrameType, SeriesType
from databricks.koalas.typedef import infer_return_type, DataFrameType, ScalarType, SeriesType
from databricks.koalas.spark.utils import as_nullable_spark_type, force_decimal_precision_scale
from databricks.koalas.utils import (
is_name_like_value,
Expand Down Expand Up @@ -556,21 +557,27 @@ def pandas_concat(series):
# The input can only be a DataFrame for struct from Spark 3.0.
# This works around to make the input as a frame. See SPARK-27240
pdf = pd.concat(series, axis=1)
pdf = pdf.rename(columns=dict(zip(pdf.columns, names)))
pdf.columns = names
return pdf

def apply_func(pdf):
return func(pdf).to_frame()

def pandas_extract(pdf, name):
# This is for output to work around a DataFrame for struct
# from Spark 3.0. See SPARK-23836
return pdf[name]

def pandas_series_func(f):
def pandas_series_func(f, by_pass):
ff = f
return lambda *series: ff(pandas_concat(series))
if by_pass:
return lambda *series: first_series(ff(*series))
else:
return lambda *series: first_series(ff(pandas_concat(series)))

def pandas_frame_func(f):
def pandas_frame_func(f, field_name):
ff = f
return lambda *series: pandas_extract(ff(pandas_concat(series)), field.name)
return lambda *series: pandas_extract(ff(pandas_concat(series)), field_name)

if should_infer_schema:
# Here we execute with the first 1000 to get the return type.
Expand All @@ -588,12 +595,21 @@ def pandas_frame_func(f):
kdf_or_kser = ks.from_pandas(transformed)

if isinstance(kdf_or_kser, ks.Series):
kser = kdf_or_kser
kser = cast(ks.Series, kdf_or_kser)

spark_return_type = force_decimal_precision_scale(
as_nullable_spark_type(kser.spark.data_type)
)
return_schema = StructType(
[StructField(SPARK_DEFAULT_SERIES_NAME, spark_return_type)]
)
output_func = GroupBy._make_pandas_df_builder_func(
self._kdf, apply_func, return_schema, retain_index=False
)

pudf = pandas_udf(
func if should_by_pass else pandas_series_func(func),
returnType=force_decimal_precision_scale(
as_nullable_spark_type(kser.spark.data_type)
),
pandas_series_func(output_func, should_by_pass),
returnType=spark_return_type,
functionType=PandasUDFType.SCALAR,
)
columns = self._kdf._internal.spark_columns
Expand All @@ -610,11 +626,11 @@ def pandas_frame_func(f):
)
return first_series(DataFrame(internal))
else:
kdf = kdf_or_kser
kdf = cast(DataFrame, kdf_or_kser)
if len(pdf) <= limit:
# only do the short cut when it returns a frame to avoid
# operations on different dataframes in case of series.
return cast(ks.DataFrame, kdf)
return kdf

# Force nullability.
return_schema = force_decimal_precision_scale(
Expand Down Expand Up @@ -642,7 +658,7 @@ def pandas_frame_func(f):
for field in return_schema.fields:
applied.append(
pandas_udf(
pandas_frame_func(output_func),
pandas_frame_func(output_func, field.name),
returnType=field.dataType,
functionType=PandasUDFType.SCALAR,
)(*columns).alias(field.name)
Expand All @@ -659,11 +675,19 @@ def pandas_frame_func(f):
"hints; however, the return type was %s." % return_sig
)
if is_return_series:
return_schema = cast(SeriesType, return_type).spark_type
spark_return_type = force_decimal_precision_scale(
as_nullable_spark_type(cast(SeriesType, return_type).spark_type)
)
return_schema = StructType(
[StructField(SPARK_DEFAULT_SERIES_NAME, spark_return_type)]
)
output_func = GroupBy._make_pandas_df_builder_func(
self._kdf, apply_func, return_schema, retain_index=False
)

pudf = pandas_udf(
func if should_by_pass else pandas_series_func(func),
returnType=return_schema,
pandas_series_func(output_func, should_by_pass),
returnType=spark_return_type,
functionType=PandasUDFType.SCALAR,
)
columns = self._kdf._internal.spark_columns
Expand All @@ -674,7 +698,7 @@ def pandas_frame_func(f):
SPARK_DEFAULT_SERIES_NAME
)
],
data_dtypes=[None],
data_dtypes=[cast(SeriesType, return_type).dtype],
column_label_names=None,
)
return first_series(DataFrame(internal))
Expand Down Expand Up @@ -703,13 +727,18 @@ def pandas_frame_func(f):
for field in return_schema.fields:
applied.append(
pandas_udf(
pandas_frame_func(output_func),
pandas_frame_func(output_func, field.name),
returnType=field.dataType,
functionType=PandasUDFType.SCALAR,
)(*columns).alias(field.name)
)
sdf = self_applied._internal.spark_frame.select(*applied)
return DataFrame(sdf)
internal = InternalFrame(
spark_frame=sdf,
index_spark_columns=None,
data_dtypes=cast(DataFrameType, return_type).dtypes,
)
return DataFrame(internal)


class KoalasSeriesMethods(object):
Expand Down Expand Up @@ -829,7 +858,7 @@ def transform_batch(self, func, *args, **kwargs) -> "Series":
# Falls back to schema inference if it fails to get signature.
pass

return_schema = None
return_type = None
if return_sig is not None:
# Extract the signature arguments from this function.
sig_return = infer_return_type(func)
Expand All @@ -838,34 +867,63 @@ def transform_batch(self, func, *args, **kwargs) -> "Series":
"Expected the return type of this function to be of type column,"
" but found type {}".format(sig_return)
)
return_schema = cast(SeriesType, sig_return).spark_type
return_type = cast(SeriesType, sig_return)

ff = func
func = lambda o: ff(o, *args, **kwargs)
return self._transform_batch(func, return_schema)
return self._transform_batch(lambda c: func(c, *args, **kwargs), return_type)

def _transform_batch(self, func, return_schema):
from databricks.koalas.series import Series
def _transform_batch(self, func, return_type: Optional[Union[SeriesType, ScalarType]]):
from databricks.koalas.groupby import GroupBy
from databricks.koalas.series import Series, first_series
from databricks import koalas as ks

if not isinstance(func, types.FunctionType):
f = func
func = lambda *args, **kwargs: f(*args, **kwargs)

if return_schema is None:
if return_type is None:
# TODO: In this case, it avoids the shortcut for now (but only infers schema)
# because it returns a series from a different DataFrame and it has a different
# anchor. We should fix this to allow the shortcut or only allow to infer
# schema.
limit = ks.get_option("compute.shortcut_limit")
pser = self._kser.head(limit)._to_internal_pandas()
pser = self._kser.head(limit + 1)._to_internal_pandas()
transformed = pser.transform(func)
kser = Series(transformed)
kser = Series(transformed) # type: Series
spark_return_type = force_decimal_precision_scale(
as_nullable_spark_type(kser.spark.data_type)
)
dtype = kser.dtype
else:
spark_return_type = return_schema
spark_return_type = return_type.spark_type
dtype = return_type.dtype

pudf = pandas_udf(func, returnType=spark_return_type, functionType=PandasUDFType.SCALAR)
return self._kser._with_new_scol(scol=pudf(self._kser.spark.column)) # TODO: dtype?
kdf = self._kser.to_frame()
columns = kdf._internal.spark_column_names

def pandas_concat(series):
# The input can only be a DataFrame for struct from Spark 3.0.
# This works around to make the input as a frame. See SPARK-27240
pdf = pd.concat(series, axis=1)
pdf.columns = columns
return pdf

def apply_func(pdf):
return func(first_series(pdf)).to_frame()

return_schema = StructType([StructField(SPARK_DEFAULT_SERIES_NAME, spark_return_type)])
output_func = GroupBy._make_pandas_df_builder_func(
kdf, apply_func, return_schema, retain_index=False
)

pudf = pandas_udf(
lambda *series: first_series(output_func(pandas_concat(series))),
returnType=spark_return_type,
functionType=PandasUDFType.SCALAR,
)

return self._kser._with_new_scol(
scol=pudf(*kdf._internal.spark_columns).alias(
self._kser._internal.spark_column_names[0]
),
dtype=dtype,
)
31 changes: 31 additions & 0 deletions databricks/koalas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ def __add__(self, other) -> Union["Series", "Index"]:
or isinstance(other, str)
):
raise TypeError("string addition can only be applied to string series or literals.")

if isinstance(self.spark.data_type, TimestampType):
raise TypeError("addition can not be applied to date times.")

if isinstance(self.spark.data_type, StringType):
# Concatenate string columns
if isinstance(other, IndexOpsMixin) and isinstance(other.spark.data_type, StringType):
Expand Down Expand Up @@ -390,6 +394,9 @@ def __mul__(self, other) -> Union["Series", "Index"]:
if isinstance(other, str):
raise TypeError("multiplication can not be applied to a string literal.")

if isinstance(self.spark.data_type, TimestampType):
raise TypeError("multiplication can not be applied to date times.")

if (
isinstance(self.spark.data_type, IntegralType)
and isinstance(other, IndexOpsMixin)
Expand Down Expand Up @@ -434,6 +441,9 @@ def __truediv__(self, other) -> Union["Series", "Index"]:
):
raise TypeError("division can not be applied on string series or literals.")

if isinstance(self.spark.data_type, TimestampType):
raise TypeError("division can not be applied to date times.")

def truediv(left, right):
return F.when(F.lit(right != 0) | F.lit(right).isNull(), left.__div__(right)).otherwise(
F.when(F.lit(left == np.inf) | F.lit(left == -np.inf), left).otherwise(
Expand All @@ -451,6 +461,9 @@ def __mod__(self, other) -> Union["Series", "Index"]:
):
raise TypeError("modulo can not be applied on string series or literals.")

if isinstance(self.spark.data_type, TimestampType):
raise TypeError("modulo can not be applied to date times.")

def mod(left, right):
return ((left % right) + right) % right

Expand All @@ -461,6 +474,9 @@ def __radd__(self, other) -> Union["Series", "Index"]:
if not isinstance(self.spark.data_type, StringType) and isinstance(other, str):
raise TypeError("string addition can only be applied to string series or literals.")

if isinstance(self.spark.data_type, TimestampType):
raise TypeError("addition can not be applied to date times.")

if isinstance(self.spark.data_type, StringType):
if isinstance(other, str):
return self._with_new_scol(
Expand Down Expand Up @@ -507,6 +523,9 @@ def __rmul__(self, other) -> Union["Series", "Index"]:
if isinstance(other, str):
raise TypeError("multiplication can not be applied to a string literal.")

if isinstance(self.spark.data_type, TimestampType):
raise TypeError("multiplication can not be applied to date times.")

if isinstance(self.spark.data_type, StringType):
if isinstance(other, int):
return column_op(SF.repeat)(self, other)
Expand All @@ -521,6 +540,9 @@ def __rtruediv__(self, other) -> Union["Series", "Index"]:
if isinstance(self.spark.data_type, StringType) or isinstance(other, str):
raise TypeError("division can not be applied on string series or literals.")

if isinstance(self.spark.data_type, TimestampType):
raise TypeError("division can not be applied to date times.")

def rtruediv(left, right):
return F.when(left == 0, F.lit(np.inf).__div__(right)).otherwise(
F.lit(right).__truediv__(left)
Expand Down Expand Up @@ -552,6 +574,9 @@ def __floordiv__(self, other) -> Union["Series", "Index"]:
):
raise TypeError("division can not be applied on string series or literals.")

if isinstance(self.spark.data_type, TimestampType):
raise TypeError("division can not be applied to date times.")

def floordiv(left, right):
return F.when(F.lit(right is np.nan), np.nan).otherwise(
F.when(
Expand All @@ -569,6 +594,9 @@ def __rfloordiv__(self, other) -> Union["Series", "Index"]:
if isinstance(self.spark.data_type, StringType) or isinstance(other, str):
raise TypeError("division can not be applied on string series or literals.")

if isinstance(self.spark.data_type, TimestampType):
raise TypeError("division can not be applied to date times.")

def rfloordiv(left, right):
return F.when(F.lit(left == 0), F.lit(np.inf).__div__(right)).otherwise(
F.when(F.lit(left) == np.nan, np.nan).otherwise(F.floor(F.lit(right).__div__(left)))
Expand All @@ -580,6 +608,9 @@ def __rmod__(self, other) -> Union["Series", "Index"]:
if isinstance(self.spark.data_type, StringType) or isinstance(other, str):
raise TypeError("modulo can not be applied on string series or literals.")

if isinstance(self.spark.data_type, TimestampType):
raise TypeError("modulo can not be applied to date times.")

def rmod(left, right):
return ((right % left) + left) % left

Expand Down
Loading

0 comments on commit f845001

Please sign in to comment.