Skip to content

Commit

Permalink
Add configuration value: compute.default_index_type (#723)
Browse files Browse the repository at this point in the history
related with #722  i added compute.default_index_type
  • Loading branch information
itholic authored and HyukjinKwon committed Aug 30, 2019
1 parent a1efa61 commit 4228bad
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 16 deletions.
3 changes: 3 additions & 0 deletions databricks/koalas/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
# For example, this value determines whether the repr() for a dataframe prints out fully or
# just a truncated repr.
"display.max_rows": 1000, # TODO: None should support unlimited.

# This sets the default index type: sequence, distributed and distributed-sequence.
"compute.default_index_type": "sequence",
} # type: Dict[str, Any]


Expand Down
9 changes: 4 additions & 5 deletions databricks/koalas/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
"""

from typing import Dict, List, Optional, Tuple, Union
import os
from itertools import accumulate

import numpy as np
Expand All @@ -32,6 +31,7 @@
from pyspark.sql.types import DataType, StructField, StructType, to_arrow_type, LongType

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
from databricks.koalas.config import get_option
from databricks.koalas.typedef import infer_pd_series_spark_type
from databricks.koalas.utils import column_index_level, default_session, lazy_property, scol_for

Expand Down Expand Up @@ -417,8 +417,7 @@ def attach_default_index(sdf):
This method attaches a default index to Spark DataFrame. Spark does not have the index
notion so corresponding column should be generated.
There are three types of default index that can be controlled by `DEFAULT_INDEX`
environment variable.
There are three types of default index can be configured by `compute.default_index_type`
- sequence: It implements a sequence that increases one by one, by Window function without
specifying partition. Therefore, it ends up with whole partition in single node.
Expand Down Expand Up @@ -474,7 +473,7 @@ def attach_default_index(sdf):
[25769803776, 60129542144, 94489280512]
"""
default_index_type = os.environ.get("DEFAULT_INDEX", "sequence")
default_index_type = get_option("compute.default_index_type")
if default_index_type == "sequence":
sequential_index = F.row_number().over(
Window.orderBy(F.monotonically_increasing_id().asc())) - 1
Expand Down Expand Up @@ -521,7 +520,7 @@ def default_index(pdf):
return sdf.select(
F.monotonically_increasing_id().alias("__index_level_0__"), *scols)
else:
raise ValueError("'DEFAULT_INDEX' environment variable should be one of 'sequence',"
raise ValueError("'compute.default_index_type' should be one of 'sequence',"
" 'distributed-sequence' and 'distributed'")

@lazy_property
Expand Down
18 changes: 7 additions & 11 deletions databricks/koalas/tests/test_default_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os

import pandas as pd

from databricks import koalas as ks
from databricks.koalas.config import set_option, reset_option
from databricks.koalas.testing.utils import ReusedSQLTestCase, TestUtils


Expand All @@ -26,13 +25,12 @@ class OneByOneDefaultIndexTest(ReusedSQLTestCase, TestUtils):
@classmethod
def setUpClass(cls):
super(OneByOneDefaultIndexTest, cls).setUpClass()
cls.default_index = os.environ.get('DEFAULT_INDEX', 'sequence')
os.environ['DEFAULT_INDEX'] = 'sequence'
set_option('compute.default_index_type', 'sequence')

@classmethod
def tearDownClass(cls):
super(OneByOneDefaultIndexTest, cls).tearDownClass()
os.environ['DEFAULT_INDEX'] = cls.default_index
reset_option('compute.default_index_type')

def test_default_index(self):
sdf = self.spark.range(1000)
Expand All @@ -44,13 +42,12 @@ class DistributedOneByOneDefaultIndexTest(ReusedSQLTestCase, TestUtils):
@classmethod
def setUpClass(cls):
super(DistributedOneByOneDefaultIndexTest, cls).setUpClass()
cls.default_index = os.environ.get('DEFAULT_INDEX', 'sequence')
os.environ['DEFAULT_INDEX'] = 'distributed-sequence'
set_option('compute.default_index_type', 'distributed-sequence')

@classmethod
def tearDownClass(cls):
super(DistributedOneByOneDefaultIndexTest, cls).tearDownClass()
os.environ['DEFAULT_INDEX'] = cls.default_index
reset_option('compute.default_index_type')

def test_default_index(self):
sdf = self.spark.range(1000)
Expand All @@ -62,13 +59,12 @@ class DistributedDefaultIndexTest(ReusedSQLTestCase, TestUtils):
@classmethod
def setUpClass(cls):
super(DistributedDefaultIndexTest, cls).setUpClass()
cls.default_index = os.environ.get('DEFAULT_INDEX', 'sequence')
os.environ['DEFAULT_INDEX'] = 'distributed'
set_option('compute.default_index_type', 'distributed')

@classmethod
def tearDownClass(cls):
super(DistributedDefaultIndexTest, cls).tearDownClass()
os.environ['DEFAULT_INDEX'] = cls.default_index
reset_option('compute.default_index_type')

def test_default_index(self):
sdf = self.spark.range(1000)
Expand Down

0 comments on commit 4228bad

Please sign in to comment.