Skip to content

Commit

Permalink
Implement TPCH substrait integration test, support tpch_13, tpch_14, …
Browse files Browse the repository at this point in the history
…tpch_16 (#11405)

optimize code
  • Loading branch information
Lordworms committed Jul 12, 2024
1 parent 02335eb commit bd25e26
Show file tree
Hide file tree
Showing 4 changed files with 2,808 additions and 1 deletion.
86 changes: 85 additions & 1 deletion datafusion/substrait/tests/cases/consumer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ mod tests {
}
Ok(ctx)
}

#[tokio::test]
async fn tpch_test_1() -> Result<()> {
let ctx = create_context(vec![(
Expand Down Expand Up @@ -314,4 +313,89 @@ mod tests {
\n TableScan: FILENAME_PLACEHOLDER_2 projection=[n_nationkey, n_name, n_regionkey, n_comment]");
Ok(())
}

// missing query 12
#[tokio::test]
async fn tpch_test_13() -> Result<()> {
let ctx = create_context(vec![
("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/customer.csv"),
("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/orders.csv"),
])
.await?;
let path = "tests/testdata/tpch_substrait_plans/query_13.json";
let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
File::open(path).expect("file not found"),
))
.expect("failed to parse json");

let plan = from_substrait_plan(&ctx, &proto).await?;
let plan_str = format!("{:?}", plan);
assert_eq!(plan_str, "Projection: count(FILENAME_PLACEHOLDER_1.o_orderkey) AS C_COUNT, count(Int64(1)) AS CUSTDIST\
\n Sort: count(Int64(1)) DESC NULLS FIRST, count(FILENAME_PLACEHOLDER_1.o_orderkey) DESC NULLS FIRST\
\n Projection: count(FILENAME_PLACEHOLDER_1.o_orderkey), count(Int64(1))\
\n Aggregate: groupBy=[[count(FILENAME_PLACEHOLDER_1.o_orderkey)]], aggr=[[count(Int64(1))]]\
\n Projection: count(FILENAME_PLACEHOLDER_1.o_orderkey)\
\n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_0.c_custkey]], aggr=[[count(FILENAME_PLACEHOLDER_1.o_orderkey)]]\
\n Projection: FILENAME_PLACEHOLDER_0.c_custkey, FILENAME_PLACEHOLDER_1.o_orderkey\
\n Left Join: FILENAME_PLACEHOLDER_0.c_custkey = FILENAME_PLACEHOLDER_1.o_custkey Filter: NOT FILENAME_PLACEHOLDER_1.o_comment LIKE CAST(Utf8(\"%special%requests%\") AS Utf8)\
\n TableScan: FILENAME_PLACEHOLDER_0 projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment]\
\n TableScan: FILENAME_PLACEHOLDER_1 projection=[o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment]");
Ok(())
}

#[tokio::test]
async fn tpch_test_14() -> Result<()> {
let ctx = create_context(vec![
("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/lineitem.csv"),
("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/part.csv"),
])
.await?;
let path = "tests/testdata/tpch_substrait_plans/query_14.json";
let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
File::open(path).expect("file not found"),
))
.expect("failed to parse json");

let plan = from_substrait_plan(&ctx, &proto).await?;
let plan_str = format!("{:?}", plan);
assert_eq!(plan_str, "Projection: Decimal128(Some(10000),5,2) * sum(CASE WHEN FILENAME_PLACEHOLDER_1.p_type LIKE CAST(Utf8(\"PROMO%\") AS Utf8) THEN FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_0.l_discount ELSE Decimal128(Some(0),19,0) END) / sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_0.l_discount) AS PROMO_REVENUE\
\n Aggregate: groupBy=[[]], aggr=[[sum(CASE WHEN FILENAME_PLACEHOLDER_1.p_type LIKE CAST(Utf8(\"PROMO%\") AS Utf8) THEN FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_0.l_discount ELSE Decimal128(Some(0),19,0) END), sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_0.l_discount)]]\
\n Projection: CASE WHEN FILENAME_PLACEHOLDER_1.p_type LIKE CAST(Utf8(\"PROMO%\") AS Utf8) THEN FILENAME_PLACEHOLDER_0.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_0.l_discount) ELSE Decimal128(Some(0),19,0) END, FILENAME_PLACEHOLDER_0.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_0.l_discount)\
\n Filter: FILENAME_PLACEHOLDER_0.l_partkey = FILENAME_PLACEHOLDER_1.p_partkey AND FILENAME_PLACEHOLDER_0.l_shipdate >= Date32(\"1995-09-01\") AND FILENAME_PLACEHOLDER_0.l_shipdate < CAST(Utf8(\"1995-10-01\") AS Date32)\
\n Inner Join: Filter: Boolean(true)\
\n TableScan: FILENAME_PLACEHOLDER_0 projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]\
\n TableScan: FILENAME_PLACEHOLDER_1 projection=[p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment]");
Ok(())
}
// query 15 is missing
#[tokio::test]
async fn tpch_test_16() -> Result<()> {
let ctx = create_context(vec![
("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/partsupp.csv"),
("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/part.csv"),
("FILENAME_PLACEHOLDER_2", "tests/testdata/tpch/supplier.csv"),
])
.await?;
let path = "tests/testdata/tpch_substrait_plans/query_16.json";
let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
File::open(path).expect("file not found"),
))
.expect("failed to parse json");

let plan = from_substrait_plan(&ctx, &proto).await?;
let plan_str = format!("{:?}", plan);
assert_eq!(plan_str, "Projection: FILENAME_PLACEHOLDER_1.p_brand AS P_BRAND, FILENAME_PLACEHOLDER_1.p_type AS P_TYPE, FILENAME_PLACEHOLDER_1.p_size AS P_SIZE, count(DISTINCT FILENAME_PLACEHOLDER_0.ps_suppkey) AS SUPPLIER_CNT\
\n Sort: count(DISTINCT FILENAME_PLACEHOLDER_0.ps_suppkey) DESC NULLS FIRST, FILENAME_PLACEHOLDER_1.p_brand ASC NULLS LAST, FILENAME_PLACEHOLDER_1.p_type ASC NULLS LAST, FILENAME_PLACEHOLDER_1.p_size ASC NULLS LAST\
\n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_1.p_brand, FILENAME_PLACEHOLDER_1.p_type, FILENAME_PLACEHOLDER_1.p_size]], aggr=[[count(DISTINCT FILENAME_PLACEHOLDER_0.ps_suppkey)]]\
\n Projection: FILENAME_PLACEHOLDER_1.p_brand, FILENAME_PLACEHOLDER_1.p_type, FILENAME_PLACEHOLDER_1.p_size, FILENAME_PLACEHOLDER_0.ps_suppkey\
\n Filter: FILENAME_PLACEHOLDER_1.p_partkey = FILENAME_PLACEHOLDER_0.ps_partkey AND FILENAME_PLACEHOLDER_1.p_brand != CAST(Utf8(\"Brand#45\") AS Utf8) AND NOT FILENAME_PLACEHOLDER_1.p_type LIKE CAST(Utf8(\"MEDIUM POLISHED%\") AS Utf8) AND (FILENAME_PLACEHOLDER_1.p_size = Int32(49) OR FILENAME_PLACEHOLDER_1.p_size = Int32(14) OR FILENAME_PLACEHOLDER_1.p_size = Int32(23) OR FILENAME_PLACEHOLDER_1.p_size = Int32(45) OR FILENAME_PLACEHOLDER_1.p_size = Int32(19) OR FILENAME_PLACEHOLDER_1.p_size = Int32(3) OR FILENAME_PLACEHOLDER_1.p_size = Int32(36) OR FILENAME_PLACEHOLDER_1.p_size = Int32(9)) AND NOT CAST(FILENAME_PLACEHOLDER_0.ps_suppkey IN (<subquery>) AS Boolean)\
\n Subquery:\
\n Projection: FILENAME_PLACEHOLDER_2.s_suppkey\
\n Filter: FILENAME_PLACEHOLDER_2.s_comment LIKE CAST(Utf8(\"%Customer%Complaints%\") AS Utf8)\
\n TableScan: FILENAME_PLACEHOLDER_2 projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]\
\n Inner Join: Filter: Boolean(true)\
\n TableScan: FILENAME_PLACEHOLDER_0 projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment]\
\n TableScan: FILENAME_PLACEHOLDER_1 projection=[p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment]");
Ok(())
}
}
Loading

0 comments on commit bd25e26

Please sign in to comment.