Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
itholic committed Dec 10, 2019
2 parents 4c7f87f + 63d42c3 commit d8c75f2
Show file tree
Hide file tree
Showing 36 changed files with 1,495 additions and 353 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ matrix:
- JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64
- SPARK_VERSION=2.4.4
- PANDAS_VERSION=0.25.3
- PYARROW_VERSION=0.15.1
- PYARROW_VERSION=0.14.1

before_install:
- ./dev/download_travis_dependencies.sh
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
<a href="https://github.com/databricks/koalas/issues">Issues</a>
·
<a href="https://groups.google.com/forum/#!forum/koalas-dev">Mailing list</a>
<br/>
<strong><a href="https://www.gofundme.com/f/help-thirsty-koalas-devastated-by-recent-fires">Help Thirsty Koalas Devasted by Recent Fires</a></strong>
</p>

The Koalas project makes data scientists more productive when interacting with big data, by implementing the pandas DataFrame API on top of Apache Spark.
Expand Down
2 changes: 1 addition & 1 deletion databricks/koalas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def assert_pyspark_version():
__all__ = ['read_csv', 'read_parquet', 'to_datetime', 'from_pandas',
'get_dummies', 'DataFrame', 'Series', 'Index', 'MultiIndex', 'pandas_wraps',
'sql', 'range', 'concat', 'melt', 'get_option', 'set_option', 'reset_option',
'read_sql_table', 'read_sql_query', 'read_sql', 'options', 'NamedAgg']
'read_sql_table', 'read_sql_query', 'read_sql', 'options', 'option_context', 'NamedAgg']


def _auto_patch():
Expand Down
249 changes: 231 additions & 18 deletions databricks/koalas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""

from functools import wraps
from typing import Union
from typing import Union, Callable, Any

import numpy as np
import pandas as pd
Expand All @@ -30,9 +30,32 @@
from pyspark.sql.functions import monotonically_increasing_id

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
from databricks.koalas.internal import _InternalFrame
from databricks.koalas import numpy_compat
from databricks.koalas.internal import _InternalFrame, SPARK_INDEX_NAME_FORMAT
from databricks.koalas.typedef import pandas_wraps, spark_type_to_pandas_dtype
from databricks.koalas.utils import align_diff_series, scol_for, validate_axis
from databricks.koalas.frame import DataFrame


def booleanize_null(left_scol, scol, f):
"""
Booleanize Null in Spark Column
"""
comp_ops = [getattr(spark.Column, '__{}__'.format(comp_op))
for comp_op in ['eq', 'ne', 'lt', 'le', 'ge', 'gt']]

if f in comp_ops:
# if `f` is "!=", fill null with True otherwise False
filler = f == spark.Column.__ne__
scol = F.when(scol.isNull(), filler).otherwise(scol)

elif f == spark.Column.__or__:
scol = F.when(left_scol.isNull() | scol.isNull(), False).otherwise(scol)

elif f == spark.Column.__and__:
scol = F.when(scol.isNull(), False).otherwise(scol)

return scol


def _column_op(f):
Expand All @@ -56,27 +79,14 @@ def wrapper(self, *args):
# Same DataFrame anchors
args = [arg._scol if isinstance(arg, IndexOpsMixin) else arg for arg in args]
scol = f(self._scol, *args)

# check if `f` is a comparison operator
comp_ops = ['eq', 'ne', 'lt', 'le', 'ge', 'gt']
is_comp_op = any(f == getattr(spark.Column, '__{}__'.format(comp_op))
for comp_op in comp_ops)

if is_comp_op:
filler = f == spark.Column.__ne__
scol = F.when(scol.isNull(), filler).otherwise(scol)

elif f == spark.Column.__or__:
scol = F.when(self._scol.isNull() | scol.isNull(), False).otherwise(scol)

elif f == spark.Column.__and__:
scol = F.when(scol.isNull(), False).otherwise(scol)
scol = booleanize_null(self._scol, scol, f)

return self._with_new_scol(scol)
else:
# Different DataFrame anchors
def apply_func(this_column, *that_columns):
return f(this_column, *that_columns)
scol = f(this_column, *that_columns)
return booleanize_null(this_column, scol, f)

return align_diff_series(apply_func, self, *args, how="full")

Expand Down Expand Up @@ -135,6 +145,8 @@ def _with_new_scol(self, scol: spark.Column) -> IndexOpsMixin
Creates new object with the new column
"""
def __init__(self, internal: _InternalFrame, kdf):
assert internal is not None
assert kdf is not None and isinstance(kdf, DataFrame)
self._internal = internal # type: _InternalFrame
self._kdf = kdf

Expand Down Expand Up @@ -213,6 +225,23 @@ def __rfloordiv__(self, other):
__rand__ = _column_op(spark.Column.__rand__)
__ror__ = _column_op(spark.Column.__ror__)

# NDArray Compat
def __array_ufunc__(self, ufunc: Callable, method: str, *inputs: Any, **kwargs: Any):
# Try dunder methods first.
result = numpy_compat.maybe_dispatch_ufunc_to_dunder_op(
self, ufunc, method, *inputs, **kwargs)

# After that, we try with PySpark APIs.
if result is NotImplemented:
result = numpy_compat.maybe_dispatch_ufunc_to_spark_func(
self, ufunc, method, *inputs, **kwargs)

if result is not NotImplemented:
return result
else:
# TODO: support more APIs?
raise NotImplementedError("Koalas objects currently do not support %s." % ufunc)

@property
def dtype(self):
"""Return the dtype object of the underlying data.
Expand Down Expand Up @@ -760,3 +789,187 @@ def _shift(self, periods, fill_value, part_cols=()):
lag_col = F.lag(col, periods).over(window)
col = F.when(lag_col.isNull() | F.isnan(lag_col), fill_value).otherwise(lag_col)
return self._with_new_scol(col).rename(self.name)

# TODO: Update Documentation for Bins Parameter when its supported
def value_counts(self, normalize=False, sort=True, ascending=False, bins=None, dropna=True):
"""
Return a Series containing counts of unique values.
The resulting object will be in descending order so that the
first element is the most frequently-occurring element.
Excludes NA values by default.
Parameters
----------
normalize : boolean, default False
If True then the object returned will contain the relative
frequencies of the unique values.
sort : boolean, default True
Sort by values.
ascending : boolean, default False
Sort in ascending order.
bins : Not Yet Supported
dropna : boolean, default True
Don't include counts of NaN.
Returns
-------
counts : Series
See Also
--------
Series.count: Number of non-NA elements in a Series.
Examples
--------
For Series
>>> df = ks.DataFrame({'x':[0, 0, 1, 1, 1, np.nan]})
>>> df.x.value_counts() # doctest: +NORMALIZE_WHITESPACE
1.0 3
0.0 2
Name: x, dtype: int64
With `normalize` set to `True`, returns the relative frequency by
dividing all values by the sum of values.
>>> df.x.value_counts(normalize=True) # doctest: +NORMALIZE_WHITESPACE
1.0 0.6
0.0 0.4
Name: x, dtype: float64
**dropna**
With `dropna` set to `False` we can also see NaN index values.
>>> df.x.value_counts(dropna=False) # doctest: +NORMALIZE_WHITESPACE
1.0 3
0.0 2
NaN 1
Name: x, dtype: int64
For Index
>>> from databricks.koalas.indexes import Index
>>> idx = Index([3, 1, 2, 3, 4, np.nan])
>>> idx
Float64Index([3.0, 1.0, 2.0, 3.0, 4.0, nan], dtype='float64')
>>> idx.value_counts().sort_index()
1.0 1
2.0 1
3.0 2
4.0 1
Name: count, dtype: int64
**sort**
With `sort` set to `False`, the result wouldn't be sorted by number of count.
>>> idx.value_counts(sort=True).sort_index()
1.0 1
2.0 1
3.0 2
4.0 1
Name: count, dtype: int64
**normalize**
With `normalize` set to `True`, returns the relative frequency by
dividing all values by the sum of values.
>>> idx.value_counts(normalize=True).sort_index()
1.0 0.2
2.0 0.2
3.0 0.4
4.0 0.2
Name: count, dtype: float64
**dropna**
With `dropna` set to `False` we can also see NaN index values.
>>> idx.value_counts(dropna=False).sort_index() # doctest: +SKIP
1.0 1
2.0 1
3.0 2
4.0 1
NaN 1
Name: count, dtype: int64
For MultiIndex.
>>> midx = pd.MultiIndex([['lama', 'cow', 'falcon'],
... ['speed', 'weight', 'length']],
... [[0, 0, 0, 1, 1, 1, 2, 2, 2],
... [1, 1, 1, 1, 1, 2, 1, 2, 2]])
>>> s = ks.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], index=midx)
>>> s.index # doctest: +SKIP
MultiIndex([( 'lama', 'weight'),
( 'lama', 'weight'),
( 'lama', 'weight'),
( 'cow', 'weight'),
( 'cow', 'weight'),
( 'cow', 'length'),
('falcon', 'weight'),
('falcon', 'length'),
('falcon', 'length')],
)
>>> s.index.value_counts().sort_index()
(cow, length) 1
(cow, weight) 2
(falcon, length) 2
(falcon, weight) 1
(lama, weight) 3
Name: count, dtype: int64
>>> s.index.value_counts(normalize=True).sort_index()
(cow, length) 0.111111
(cow, weight) 0.222222
(falcon, length) 0.222222
(falcon, weight) 0.111111
(lama, weight) 0.333333
Name: count, dtype: float64
If Index has name, keep the name up.
>>> idx = Index([0, 0, 0, 1, 1, 2, 3], name='koalas')
>>> idx.value_counts().sort_index()
0 3
1 2
2 1
3 1
Name: koalas, dtype: int64
"""
from databricks.koalas.series import Series, _col
if bins is not None:
raise NotImplementedError("value_counts currently does not support bins")

if dropna:
sdf_dropna = self._internal._sdf.dropna()
else:
sdf_dropna = self._internal._sdf
index_name = SPARK_INDEX_NAME_FORMAT(0)
sdf = sdf_dropna.groupby(self._scol.alias(index_name)).count()
if sort:
if ascending:
sdf = sdf.orderBy(F.col('count'))
else:
sdf = sdf.orderBy(F.col('count').desc())

if normalize:
sum = sdf_dropna.count()
sdf = sdf.withColumn('count', F.col('count') / F.lit(sum))

column_index = self._internal.column_index
if (column_index[0] is None) or (None in column_index[0]):
internal = _InternalFrame(sdf=sdf,
index_map=[(index_name, None)],
column_scols=[scol_for(sdf, 'count')])
else:
internal = _InternalFrame(sdf=sdf,
index_map=[(index_name, None)],
column_index=column_index,
column_scols=[scol_for(sdf, 'count')],
column_index_names=self._internal.column_index_names)

return _col(DataFrame(internal))
31 changes: 30 additions & 1 deletion databricks/koalas/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""
Infrastructure of options for Koalas.
"""
from contextlib import contextmanager
import json
from typing import Union, Any, Tuple, Callable, List, Dict

Expand All @@ -25,7 +26,7 @@
from databricks.koalas.utils import default_session


__all__ = ['get_option', 'set_option', 'reset_option', 'options']
__all__ = ['get_option', 'set_option', 'reset_option', 'options', 'option_context']


class Option:
Expand Down Expand Up @@ -303,6 +304,34 @@ def reset_option(key: str) -> None:
default_session().conf.unset(_key_format(key))


@contextmanager
def option_context(*args):
"""
Context manager to temporarily set options in the `with` statement context.
You need to invoke as ``option_context(pat, val, [(pat, val), ...])``.
Examples
--------
>>> with option_context('display.max_rows', 10, 'compute.max_rows', 5):
... print(get_option('display.max_rows'), get_option('compute.max_rows'))
10 5
>>> print(get_option('display.max_rows'), get_option('compute.max_rows'))
1000 1000
"""
if len(args) == 0 or len(args) % 2 != 0:
raise ValueError('Need to invoke as option_context(pat, val, [(pat, val), ...]).')
opts = dict(zip(args[::2], args[1::2]))
orig_opts = {key: get_option(key) for key in opts}
try:
for key, value in opts.items():
set_option(key, value)
yield
finally:
for key, value in orig_opts.items():
set_option(key, value)


def _check_option(key: str) -> None:
if key not in _options_dict:
raise OptionError(
Expand Down
Loading

0 comments on commit d8c75f2

Please sign in to comment.