Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration value: compute.default_index_type #723

Merged
merged 3 commits into from
Aug 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions databricks/koalas/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
# For example, this value determines whether the repr() for a dataframe prints out fully or
# just a truncated repr.
"display.max_rows": 1000, # TODO: None should support unlimited.

# This sets the default index type: sequence, distributed and distributed-sequence.
"compute.default_index_type": "sequence",
} # type: Dict[str, Any]


Expand Down
9 changes: 4 additions & 5 deletions databricks/koalas/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
"""

from typing import Dict, List, Optional, Tuple, Union
import os
from itertools import accumulate

import numpy as np
Expand All @@ -32,6 +31,7 @@
from pyspark.sql.types import DataType, StructField, StructType, to_arrow_type, LongType

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
from databricks.koalas.config import get_option
from databricks.koalas.typedef import infer_pd_series_spark_type
from databricks.koalas.utils import column_index_level, default_session, lazy_property, scol_for

Expand Down Expand Up @@ -417,8 +417,7 @@ def attach_default_index(sdf):
This method attaches a default index to Spark DataFrame. Spark does not have the index
notion so corresponding column should be generated.

There are three types of default index that can be controlled by `DEFAULT_INDEX`
environment variable.
There are three types of default index can be configured by `compute.default_index_type`

- sequence: It implements a sequence that increases one by one, by Window function without
specifying partition. Therefore, it ends up with whole partition in single node.
Expand Down Expand Up @@ -474,7 +473,7 @@ def attach_default_index(sdf):
[25769803776, 60129542144, 94489280512]

"""
default_index_type = os.environ.get("DEFAULT_INDEX", "sequence")
default_index_type = get_option("compute.default_index_type")
if default_index_type == "sequence":
sequential_index = F.row_number().over(
Window.orderBy(F.monotonically_increasing_id().asc())) - 1
Expand Down Expand Up @@ -521,7 +520,7 @@ def default_index(pdf):
return sdf.select(
F.monotonically_increasing_id().alias("__index_level_0__"), *scols)
else:
raise ValueError("'DEFAULT_INDEX' environment variable should be one of 'sequence',"
raise ValueError("'compute.default_index_type' should be one of 'sequence',"
" 'distributed-sequence' and 'distributed'")

@lazy_property
Expand Down
18 changes: 7 additions & 11 deletions databricks/koalas/tests/test_default_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os

import pandas as pd

from databricks import koalas as ks
from databricks.koalas.config import set_option, reset_option
from databricks.koalas.testing.utils import ReusedSQLTestCase, TestUtils


Expand All @@ -26,13 +25,12 @@ class OneByOneDefaultIndexTest(ReusedSQLTestCase, TestUtils):
@classmethod
def setUpClass(cls):
super(OneByOneDefaultIndexTest, cls).setUpClass()
cls.default_index = os.environ.get('DEFAULT_INDEX', 'sequence')
os.environ['DEFAULT_INDEX'] = 'sequence'
set_option('compute.default_index_type', 'sequence')

@classmethod
def tearDownClass(cls):
super(OneByOneDefaultIndexTest, cls).tearDownClass()
os.environ['DEFAULT_INDEX'] = cls.default_index
reset_option('compute.default_index_type')

def test_default_index(self):
sdf = self.spark.range(1000)
Expand All @@ -44,13 +42,12 @@ class DistributedOneByOneDefaultIndexTest(ReusedSQLTestCase, TestUtils):
@classmethod
def setUpClass(cls):
super(DistributedOneByOneDefaultIndexTest, cls).setUpClass()
cls.default_index = os.environ.get('DEFAULT_INDEX', 'sequence')
os.environ['DEFAULT_INDEX'] = 'distributed-sequence'
set_option('compute.default_index_type', 'distributed-sequence')

@classmethod
def tearDownClass(cls):
super(DistributedOneByOneDefaultIndexTest, cls).tearDownClass()
os.environ['DEFAULT_INDEX'] = cls.default_index
reset_option('compute.default_index_type')

def test_default_index(self):
sdf = self.spark.range(1000)
Expand All @@ -62,13 +59,12 @@ class DistributedDefaultIndexTest(ReusedSQLTestCase, TestUtils):
@classmethod
def setUpClass(cls):
super(DistributedDefaultIndexTest, cls).setUpClass()
cls.default_index = os.environ.get('DEFAULT_INDEX', 'sequence')
os.environ['DEFAULT_INDEX'] = 'distributed'
set_option('compute.default_index_type', 'distributed')

@classmethod
def tearDownClass(cls):
super(DistributedDefaultIndexTest, cls).tearDownClass()
os.environ['DEFAULT_INDEX'] = cls.default_index
reset_option('compute.default_index_type')

def test_default_index(self):
sdf = self.spark.range(1000)
Expand Down