Skip to content

Commit

Permalink
chore(query): testing tpch/tpcds using 1G scale factor (#17449)
Browse files Browse the repository at this point in the history
* update

* update

* update
  • Loading branch information
sundy-li authored Feb 14, 2025
1 parent 7e3df33 commit 54fceaa
Show file tree
Hide file tree
Showing 33 changed files with 37,577 additions and 4,895 deletions.
7 changes: 7 additions & 0 deletions .github/actions/setup_test/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ runs:
with:
python-version: "3.12"

- name: Install Python packages
shell: bash
run: |
pip install rich
pip install databend_driver
pip install duckdb
- uses: actions/setup-java@v4
with:
distribution: temurin
Expand Down
2 changes: 1 addition & 1 deletion benchmark/tpcds/shell_env.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env bash

export MYSQL_DATABASE=${MYSQL_DATABASE:="tpcds"}
export QUERY_DATABASE=${MYSQL_DATABASE:="tpcds"}
export QUERY_MYSQL_HANDLER_HOST=${QUERY_MYSQL_HANDLER_HOST:="127.0.0.1"}
export QUERY_MYSQL_HANDLER_PORT=${QUERY_MYSQL_HANDLER_PORT:="3307"}
export QUERY_HTTP_HANDLER_PORT=${QUERY_HTTP_HANDLER_PORT:="8000"}
Expand Down
2 changes: 1 addition & 1 deletion benchmark/tpch/shell_env.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env bash

export MYSQL_DATABASE=${MYSQL_DATABASE:="tpch"}
export QUERY_DATABASE=${MYSQL_DATABASE:="tpch"}
export QUERY_MYSQL_HANDLER_HOST=${QUERY_MYSQL_HANDLER_HOST:="127.0.0.1"}
export QUERY_MYSQL_HANDLER_PORT=${QUERY_MYSQL_HANDLER_PORT:="3307"}
export QUERY_HTTP_HANDLER_PORT=${QUERY_HTTP_HANDLER_PORT:="8000"}
Expand Down
2 changes: 1 addition & 1 deletion benchmark/tpch/tpch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
echo """
INSTALL tpch;
LOAD tpch;
SELECT * FROM dsdgen(sf=1); -- sf can be other values, such as 0.1, 1, 10, ...
SELECT * FROM dbgen(sf=1); -- sf can be other values, such as 0.1, 1, 10, ...
EXPORT DATABASE '/tmp/tpch_1/' (FORMAT CSV, DELIMITER '|');
""" | duckdb

Expand Down
6 changes: 5 additions & 1 deletion src/query/sql/src/planner/binder/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl Binder {
.to_string(),
));
} else {
let rewrite_scalar = self
let mut rewrite_scalar = self
.rewrite_scalar_with_replacement(
bind_context,
&bound_expr,
Expand All @@ -148,6 +148,10 @@ impl Binder {
)
.map_err(|e| ErrorCode::SemanticError(e.message()))?;

let mut rewriter =
AggregateRewriter::new(bind_context, self.metadata.clone());
rewriter.visit(&mut rewrite_scalar)?;

if let ScalarExpr::ConstantExpr(..) = rewrite_scalar {
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/shell_env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin
export AWS_EC2_METADATA_DISABLED=true

export MYSQL_DATABASE=${MYSQL_DATABASE:="default"}
export QUERY_DATABASE=${QUERY_DATABASE:="default"}
export QUERY_MYSQL_HANDLER_HOST=${QUERY_MYSQL_HANDLER_HOST:="127.0.0.1"}
export QUERY_MYSQL_HANDLER_PORT=${QUERY_MYSQL_HANDLER_PORT:="3307"}
export QUERY_HTTP_HANDLER_PORT=${QUERY_HTTP_HANDLER_PORT:="8000"}
Expand Down
15 changes: 15 additions & 0 deletions tests/sqllogictests/scripts/prepare_duckdb_tpcds_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import duckdb, sys

sf = sys.argv[1]

# Initialize a DuckDB instance
con = duckdb.connect(":memory:")

con.install_extension("tpcds")
con.load_extension("tpcds")
# Execute the commands
con.execute(f"CALL dsdgen(sf={sf})")
con.execute(f"EXPORT DATABASE '/tmp/tpcds_{sf}/' (FORMAT CSV, DELIMITER '|')")

# Close the connection
con.close()
15 changes: 15 additions & 0 deletions tests/sqllogictests/scripts/prepare_duckdb_tpch_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import duckdb, sys

sf = sys.argv[1]

# Initialize a DuckDB instance
con = duckdb.connect(":memory:")

con.install_extension("tpch")
con.load_extension("tpch")
# Execute the commands
con.execute(f"CALL dbgen(sf={sf})")
con.execute(f"EXPORT DATABASE '/tmp/tpch_{sf}/' (FORMAT CSV, DELIMITER '|')")

# Close the connection
con.close()
4 changes: 0 additions & 4 deletions tests/sqllogictests/scripts/prepare_spill_data.sh

This file was deleted.

40 changes: 16 additions & 24 deletions tests/sqllogictests/scripts/prepare_tpcds_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
# shellcheck disable=SC2034
target_dir="tests/sqllogictests/"

db="tpcds"
db=${1:-"tpcds"}


tables=(
call_center
Expand Down Expand Up @@ -34,39 +35,30 @@ tables=(
web_site
)

# Clear Data
# shellcheck disable=SC2068
for t in ${tables[@]}; do
echo "DROP TABLE IF EXISTS ${db}.$t" | $BENDSQL_CLIENT_CONNECT
done

echo "CREATE DATABASE IF NOT EXISTS tpcds" | $BENDSQL_CLIENT_CONNECT
force=${2:-"1"}
if [ "$force" == "0" ]; then
res=`echo "SELECT COUNT() from ${db}.call_center" | $BENDSQL_CLIENT_CONNECT`
if [ "$res" != "0" -a "$res" != "" ]; then
echo "Table $db.call_center already exists and is not empty, size: ${res}. Use force=1 to override it."
exit 0
fi
fi

echo "CREATE OR REPLACE DATABASE tpcds" | $BENDSQL_CLIENT_CONNECT

# Create Tables;
# shellcheck disable=SC2002
cat ${target_dir}/scripts/tpcds.sql | $BENDSQL_CLIENT_CONNECT

# download data
mkdir -p ${target_dir}/data/
if [ ! -d ${target_dir}/data/tpcds.tar.gz ]; then
curl -s -o ${target_dir}/data/tpcds.tar.gz https://ci.databend.com/dataset/stateful/tpcds.tar.gz
fi

tar -zxf ${target_dir}/data/tpcds.tar.gz -C ${target_dir}/data

# insert data to tables
# shellcheck disable=SC2068
python ${target_dir}/scripts/prepare_duckdb_tpcds_data.py 1

stmt "drop stage if exists s1"
stmt "create stage s1 url='fs://${PWD}/${target_dir}/'"
stmt "create stage s1 url='fs:///tmp/tpcds_1/'"

for t in ${tables[@]}; do
echo "$t"
sub_path="data/data/$t.csv"
query "copy into ${db}.${t} from @s1/${sub_path} file_format = (type = CSV skip_header = 0 field_delimiter = '|' record_delimiter = '\n')"
query "copy into ${db}.${t} from @s1/${t}.csv file_format = (type = CSV skip_header = 1 field_delimiter = '|' record_delimiter = '\n')"
query "analyze table $db.$t"
done

if [ -d "tests/sqllogictests/data" ]; then
rm -rf tests/sqllogictests/data
fi
# rm -rf /tmp/tpcds_1
29 changes: 16 additions & 13 deletions tests/sqllogictests/scripts/prepare_tpch_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@
data_dir="tests/sqllogictests/data"

db=${1:-"tpch_test"}
force=${2:-"1"}

if [ "$force" == "0" ]; then
res=`echo "SELECT COUNT() from ${db}.nation" | $BENDSQL_CLIENT_CONNECT`
if [ "$res" != "0" -a "$res" != "" ]; then
echo "Table $db.nation already exists and is not empty, size: ${res}. Use force=1 to override it."
exit 0
fi
fi

echo "DROP DATABASE if EXISTS ${db}" | $BENDSQL_CLIENT_CONNECT
echo "CREATE DATABASE ${db}" | $BENDSQL_CLIENT_CONNECT
Expand Down Expand Up @@ -108,25 +117,19 @@ echo "CREATE TABLE IF NOT EXISTS ${db}.lineitem
l_comment STRING not null
) CLUSTER BY(l_shipdate, l_orderkey)" | $BENDSQL_CLIENT_CONNECT

#download data
mkdir -p $data_dir
if [ ! -d ${data_dir}/tpch.tar.gz ]; then
curl -s -o ${data_dir}/tpch.tar.gz https://ci.databend.com/dataset/stateful/tpch.tar.gz
fi

tar -zxf ${data_dir}/tpch.tar.gz -C $data_dir
#import data
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
python $CURDIR/prepare_duckdb_tpch_data.py 1

stmt "drop stage if exists s1"
stmt "create stage s1 url='fs://${PWD}/${data_dir}/'"
stmt "create stage s1 url='fs:///tmp/tpch_1/'"

# insert data to tables
for t in customer lineitem nation orders partsupp part region supplier; do
echo "$t"
sub_path="tests/suites/0_stateless/13_tpch/data/$t.tbl"
stmt "copy into ${db}.$t from @s1/${sub_path} file_format = (type = CSV skip_header = 0 field_delimiter = '|' record_delimiter = '\n')"
query "copy into ${db}.$t from @s1/$t.csv force = true file_format = (type = CSV skip_header = 1 field_delimiter = '|' record_delimiter = '\n')"
query "analyze table $db.$t"
done


if [ -d "tests/sqllogictests/data" ]; then
rm -rf tests/sqllogictests/data
fi
# rm -rf /tmp/tpch_1
7 changes: 7 additions & 0 deletions tests/sqllogictests/src/arg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ pub struct SqlLogicTestArgs {
)]
pub bench: bool,

#[arg(
long = "force_load",
default_missing_value = "false",
help = "The arg is used to force load test data (tpch/tpcds)"
)]
pub force_load: bool,

// Set specific the database to connect
#[arg(
long = "database",
Expand Down
6 changes: 5 additions & 1 deletion tests/sqllogictests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ async fn run_suits(args: SqlLogicTestArgs, client_type: ClientType) -> Result<()
.to_str()
.unwrap()
.to_string();

if !file_name.ends_with(".test") {
continue;
}
if let Some(ref specific_file) = args.file {
if !specific_file.split(',').any(|f| f.eq(&file_name)) {
continue;
Expand All @@ -280,7 +284,7 @@ async fn run_suits(args: SqlLogicTestArgs, client_type: ClientType) -> Result<()

if !args.bench {
// lazy load test datas
lazy_prepare_data(&lazy_dirs)?;
lazy_prepare_data(&lazy_dirs, args.force_load)?;
}
// lazy run dictionaries containers
let _dict_container = lazy_run_dictionary_containers(&lazy_dirs).await?;
Expand Down
30 changes: 12 additions & 18 deletions tests/sqllogictests/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ pub fn get_files(suit: PathBuf) -> Result<Vec<walkdir::Result<DirEntry>>> {
static PREPARE_TPCH: std::sync::Once = std::sync::Once::new();
static PREPARE_TPCDS: std::sync::Once = std::sync::Once::new();
static PREPARE_STAGE: std::sync::Once = std::sync::Once::new();
static PREPARE_SPILL: std::sync::Once = std::sync::Once::new();
static PREPARE_WASM: std::sync::Once = std::sync::Once::new();

#[derive(Eq, Hash, PartialEq)]
Expand All @@ -162,13 +161,12 @@ pub enum LazyDir {
Tpcds,
Stage,
UdfNative,
Spill,
Dictionaries,
}

pub fn collect_lazy_dir(file_path: &Path, lazy_dirs: &mut HashSet<LazyDir>) -> Result<()> {
let file_path = file_path.to_str().unwrap_or_default();
if file_path.contains("tpch/") {
if file_path.contains("tpch/") || file_path.contains("tpch_spill/") {
if !lazy_dirs.contains(&LazyDir::Tpch) {
lazy_dirs.insert(LazyDir::Tpch);
}
Expand All @@ -184,55 +182,51 @@ pub fn collect_lazy_dir(file_path: &Path, lazy_dirs: &mut HashSet<LazyDir>) -> R
if !lazy_dirs.contains(&LazyDir::UdfNative) {
lazy_dirs.insert(LazyDir::UdfNative);
}
} else if file_path.contains("spill/") {
if !lazy_dirs.contains(&LazyDir::Spill) {
lazy_dirs.insert(LazyDir::Spill);
}
} else if file_path.contains("dictionaries/") && !lazy_dirs.contains(&LazyDir::Dictionaries) {
lazy_dirs.insert(LazyDir::Dictionaries);
}
Ok(())
}

pub fn lazy_prepare_data(lazy_dirs: &HashSet<LazyDir>) -> Result<()> {
pub fn lazy_prepare_data(lazy_dirs: &HashSet<LazyDir>, force_load: bool) -> Result<()> {
let force_load_flag = if force_load { "1" } else { "0" };
for lazy_dir in lazy_dirs {
match lazy_dir {
LazyDir::Tpch => {
PREPARE_TPCH.call_once(|| {
println!("Calling the script prepare_tpch_data.sh ...");
run_script("prepare_tpch_data.sh").unwrap();
run_script("prepare_tpch_data.sh", &["tpch_test", force_load_flag]).unwrap();
});
}
LazyDir::Tpcds => {
PREPARE_TPCDS.call_once(|| {
println!("Calling the script prepare_tpcds_data.sh ...");
run_script("prepare_tpcds_data.sh").unwrap();
run_script("prepare_tpcds_data.sh", &["tpcds", force_load_flag]).unwrap();
});
}
LazyDir::Stage => {
PREPARE_STAGE.call_once(|| {
println!("Calling the script prepare_stage.sh ...");
run_script("prepare_stage.sh").unwrap();
run_script("prepare_stage.sh", &[]).unwrap();
});
}
LazyDir::UdfNative => {
println!("wasm context Calling the script prepare_stage.sh ...");
PREPARE_WASM.call_once(|| run_script("prepare_stage.sh").unwrap())
}
LazyDir::Spill => {
println!("Calling the script prepare_spill_data.sh ...");
PREPARE_SPILL.call_once(|| run_script("prepare_spill_data.sh").unwrap())
PREPARE_WASM.call_once(|| run_script("prepare_stage.sh", &[]).unwrap())
}
_ => {}
}
}
Ok(())
}

fn run_script(name: &str) -> Result<()> {
fn run_script(name: &str, args: &[&str]) -> Result<()> {
let path = format!("tests/sqllogictests/scripts/{}", name);
let mut new_args = vec![path.as_str()];
new_args.extend_from_slice(args);

let output = std::process::Command::new("bash")
.arg(path)
.args(new_args)
.output()
.expect("failed to execute process");
if !output.status.success() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ select (number % 2) as a from numbers(5) group by grouping sets (a) order by a;
0
1

statement ok
select sum(number), number % 3 a, grouping(number % 3)+grouping(number % 4) AS lochierarchy from numbers(10)
group by rollup(number % 3, number % 4) order by grouping(number % 3)+grouping(number % 4) ;

query TT
select number % 2 as a, number % 3 as b from numbers(24) group by grouping sets ((a,b), (a), (b)) order by a,b;
----
Expand Down
Loading

0 comments on commit 54fceaa

Please sign in to comment.