Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration value: compute.default_index_type #723

Merged
merged 3 commits into from
Aug 30, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions databricks/koalas/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,27 @@
# For example, this value determines whether the repr() for a dataframe prints out fully or
# just a truncated repr.
"display.max_rows": 1000, # TODO: None should support unlimited.

# This sets the default index type
# There are three types of default index that can be configured by `compute.default_index_type`
# environment variable.
# - sequence: It implements a sequence that increases one by one, by Window function without
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
# specifying partition. Therefore, it ends up with whole partition in single node.
# This index type should be avoided when the data is large. This is default.
# - distributed: It implements a monotonically increasing sequence simply by using
# Spark's `monotonically_increasing_id` function. If the index does not have to be
# a sequence that increases one by one, this index should be used.
# Performance-wise, this index almost does not have any penalty comparing to
# other index types. Note that we cannot use this type of index for combining
# two dataframes because it is not guaranteed to have the same indexes in two
# dataframes.
# - distributed-sequence: It implements a sequence that increases one by one, by group-by and
# group-map approach. It still generates the sequential index globally.
# If the default index must be the sequence in a large dataset, this
# index has to be used.
# Note that if more data are added to the data source after creating this index,
# then it does not guarantee the sequential index.
"compute.default_index_type": "sequence"
} # type: Dict[str, Any]


Expand Down
9 changes: 4 additions & 5 deletions databricks/koalas/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
"""

from typing import Dict, List, Optional, Tuple, Union
import os
from itertools import accumulate

import numpy as np
Expand All @@ -32,6 +31,7 @@
from pyspark.sql.types import DataType, StructField, StructType, to_arrow_type, LongType

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
from databricks.koalas.config import get_option
from databricks.koalas.typedef import infer_pd_series_spark_type
from databricks.koalas.utils import column_index_level, default_session, lazy_property, scol_for

Expand Down Expand Up @@ -417,8 +417,7 @@ def attach_default_index(sdf):
This method attaches a default index to Spark DataFrame. Spark does not have the index
notion so corresponding column should be generated.

There are three types of default index that can be controlled by `DEFAULT_INDEX`
environment variable.
There are three types of default index can be configured by `compute.default_index_type`

- sequence: It implements a sequence that increases one by one, by Window function without
specifying partition. Therefore, it ends up with whole partition in single node.
Expand Down Expand Up @@ -474,7 +473,7 @@ def attach_default_index(sdf):
[25769803776, 60129542144, 94489280512]

"""
default_index_type = os.environ.get("DEFAULT_INDEX", "sequence")
default_index_type = get_option("compute.default_index_type")
if default_index_type == "sequence":
sequential_index = F.row_number().over(
Window.orderBy(F.monotonically_increasing_id().asc())) - 1
Expand Down Expand Up @@ -521,7 +520,7 @@ def default_index(pdf):
return sdf.select(
F.monotonically_increasing_id().alias("__index_level_0__"), *scols)
else:
raise ValueError("'DEFAULT_INDEX' environment variable should be one of 'sequence',"
raise ValueError("'compute.default_index_type' should be one of 'sequence',"
" 'distributed-sequence' and 'distributed'")

@lazy_property
Expand Down