Skip to content

Commit cb4a2ee

Browse files
authored
Merge branch 'Eventual-Inc:main' into uc-write-deltalake
2 parents 20ee1c4 + f23ee37 commit cb4a2ee

File tree

31 files changed

+1705
-122
lines changed

31 files changed

+1705
-122
lines changed

.github/workflows/pr-labeller.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ jobs:
1616
- name: PR Conventional Commit Validation
1717
uses: ytanikin/[email protected]
1818
with:
19-
task_types: '["feat","fix","docs","test","ci","refactor","perf","chore","revert"]'
19+
task_types: '["feat","fix","docs","test","ci","refactor","perf","chore","revert","build"]'
2020
add_label: 'true'

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ build
1616

1717
**/node_modules/**
1818
data/**
19+
benchmarking/tpcds/data
1920
*.so
2021
*.whl
2122
log/

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@ hooks: .venv
5656

5757
.PHONY: build
5858
build: check-toolchain .venv ## Compile and install Daft for development
59-
@unset CONDA_PREFIX && PYO3_PYTHON=$(VENV_BIN)/python $(VENV_BIN)/maturin develop --extras=all
59+
@unset CONDA_PREFIX && PYO3_PYTHON=$(VENV_BIN)/python $(VENV_BIN)/maturin develop --extras=all --uv
6060

6161
.PHONY: build-release
6262
build-release: check-toolchain .venv ## Compile and install a faster Daft binary
63-
@unset CONDA_PREFIX && PYO3_PYTHON=$(VENV_BIN)/python $(VENV_BIN)/maturin develop --release
63+
@unset CONDA_PREFIX && PYO3_PYTHON=$(VENV_BIN)/python $(VENV_BIN)/maturin develop --release --uv
6464

6565
.PHONY: test
6666
test: .venv build ## Run tests

benchmarking/tpcds/__init__.py

Whitespace-only changes.

benchmarking/tpcds/__main__.py

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
import argparse
2+
import logging
3+
import typing
4+
from dataclasses import dataclass
5+
from datetime import datetime, timedelta
6+
from pathlib import Path
7+
from typing import Optional
8+
9+
import ray
10+
11+
import daft
12+
13+
from ..tpch import __main__ as tpch
14+
from ..tpch import ray_job_runner
15+
from . import datagen, helpers
16+
17+
logger = logging.getLogger(__name__)
18+
19+
SQL_QUERIES_PATH = Path(__file__).parent / "queries"
20+
21+
22+
@dataclass
23+
class ParsedArgs:
24+
tpcds_gen_folder: Path
25+
scale_factor: float
26+
questions: str
27+
ray_address: Optional[str]
28+
dry_run: bool
29+
30+
31+
@dataclass
32+
class RunArgs:
33+
scaled_tpcds_gen_folder: Path
34+
query_indices: list[int]
35+
ray_address: Optional[str]
36+
dry_run: bool
37+
38+
39+
@dataclass
40+
class Result:
41+
index: int
42+
duration: Optional[timedelta]
43+
error_msg: Optional[str]
44+
45+
def __repr__(self) -> str:
46+
if self.duration and self.error_msg:
47+
typing.assert_never("Both duration and error_msg are not None")
48+
elif self.duration:
49+
return f"(Q{self.index} SUCCESS - duration: {self.duration})"
50+
elif self.error_msg:
51+
return f"(Q{self.index} FAILURE - error msg: {self.error_msg})"
52+
else:
53+
typing.assert_never("Both duration and error_msg are None")
54+
55+
56+
def run_query_on_ray(
57+
run_args: RunArgs,
58+
) -> list[Result]:
59+
ray.init(address=run_args.ray_address if run_args.ray_address else None)
60+
results = []
61+
62+
for query_index in run_args.query_indices:
63+
working_dir = Path("benchmarking") / "tpcds"
64+
ray_entrypoint_script = "ray_entrypoint.py"
65+
duration = None
66+
error_msg = None
67+
try:
68+
start = datetime.now()
69+
ray_job_runner.run_on_ray(
70+
run_args.ray_address,
71+
{
72+
"entrypoint": f"python {ray_entrypoint_script} --tpcds-gen-folder 'data/0.01' --question {query_index} {'--dry-run' if run_args.dry_run else ''}",
73+
"runtime_env": {
74+
"working_dir": working_dir,
75+
},
76+
},
77+
)
78+
end = datetime.now()
79+
duration = end - start
80+
except Exception as e:
81+
error_msg = str(e)
82+
83+
results.append(Result(index=query_index, duration=duration, error_msg=error_msg))
84+
85+
return results
86+
87+
88+
def run_query_on_local(
89+
run_args: RunArgs,
90+
) -> list[Result]:
91+
catalog = helpers.generate_catalog(run_args.scaled_tpcds_gen_folder)
92+
results = []
93+
94+
for query_index in run_args.query_indices:
95+
query_file = SQL_QUERIES_PATH / f"{query_index:02}.sql"
96+
with open(query_file) as f:
97+
query = f.read()
98+
99+
start = datetime.now()
100+
101+
duration = None
102+
error_msg = None
103+
try:
104+
daft.sql(query, catalog=catalog).explain(show_all=True)
105+
if not run_args.dry_run:
106+
daft.sql(query, catalog=catalog).collect()
107+
108+
end = datetime.now()
109+
duration = end - start
110+
except Exception as e:
111+
error_msg = str(e)
112+
113+
results.append(Result(index=query_index, duration=duration, error_msg=error_msg))
114+
115+
return results
116+
117+
118+
def run_benchmarks(
119+
run_args: RunArgs,
120+
) -> list[Result]:
121+
logger.info(
122+
"Running the following questions: %s",
123+
run_args.query_indices,
124+
)
125+
126+
runner = tpch.get_daft_benchmark_runner_name()
127+
128+
logger.info(
129+
"Running on the following runner: %s",
130+
runner,
131+
)
132+
133+
if runner == "ray":
134+
return run_query_on_ray(run_args)
135+
elif runner == "py" or runner == "native":
136+
return run_query_on_local(run_args)
137+
else:
138+
typing.assert_never(runner)
139+
140+
141+
def main(args: ParsedArgs):
142+
scaled_tpcds_gen_folder = args.tpcds_gen_folder / str(args.scale_factor)
143+
datagen.gen_tpcds(scaled_tpcds_gen_folder, args.scale_factor)
144+
query_indices = helpers.parse_questions_str(args.questions)
145+
results = run_benchmarks(
146+
RunArgs(
147+
scaled_tpcds_gen_folder=scaled_tpcds_gen_folder,
148+
query_indices=query_indices,
149+
ray_address=args.ray_address,
150+
dry_run=args.dry_run,
151+
)
152+
)
153+
154+
# TODO(ronnie): improve visualization of results; simply printing them to console is not the best way...
155+
print(f"{results=}")
156+
157+
158+
if __name__ == "__main__":
159+
logging.basicConfig(level="INFO")
160+
161+
parser = argparse.ArgumentParser()
162+
parser.add_argument(
163+
"--tpcds-gen-folder",
164+
default="benchmarking/tpcds/data",
165+
type=Path,
166+
help="Path to the folder containing the TPC-DS dsdgen tool and generated data",
167+
)
168+
parser.add_argument("--scale-factor", default=0.01, type=float, help="Scale factor to run on in GB")
169+
parser.add_argument("--questions", default="*", type=str, help="The questions to run")
170+
parser.add_argument("--ray-address", type=str, help="The address of the head node of the ray cluster")
171+
parser.add_argument(
172+
"--dry-run",
173+
action="store_true",
174+
help="Whether to run in dry-run mode; if true, only the plan will be printed, but no query will be executed",
175+
)
176+
args = parser.parse_args()
177+
178+
tpcds_gen_folder: Path = args.tpcds_gen_folder
179+
assert args.scale_factor > 0
180+
181+
main(
182+
ParsedArgs(
183+
tpcds_gen_folder=tpcds_gen_folder,
184+
scale_factor=args.scale_factor,
185+
questions=args.questions,
186+
ray_address=args.ray_address,
187+
dry_run=args.dry_run,
188+
)
189+
)

benchmarking/tpcds/datagen.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,50 @@
11
import argparse
22
import logging
3-
import os
3+
from pathlib import Path
44

55
import duckdb
66

77
logger = logging.getLogger(__name__)
88

99

10-
def gen_tpcds(basedir: str, scale_factor: float):
11-
if not os.path.exists(basedir):
12-
os.makedirs(basedir)
13-
db = duckdb.connect(f"{basedir}/tpcds.db")
10+
def gen_tpcds(dir: Path, scale_factor: float):
11+
if dir.exists():
12+
assert dir.is_dir(), "The location in which to generate the data must be a directory"
13+
logger.info(
14+
"The directory %s already exists; doing nothing",
15+
dir,
16+
)
17+
return
18+
19+
dir.mkdir(parents=True, exist_ok=True)
20+
db = duckdb.connect(database=dir / "tpcds.db")
1421
db.sql(f"call dsdgen(sf = {scale_factor})")
1522
for item in db.sql("show tables").fetchall():
1623
tbl = item[0]
17-
print(f"Exporting {tbl} to {basedir}/{tbl}.parquet")
18-
db.sql(f"COPY {tbl} TO '{basedir}/{tbl}.parquet'")
24+
parquet_file = dir / f"{tbl}.parquet"
25+
print(f"Exporting {tbl} to {parquet_file}")
26+
db.sql(f"COPY {tbl} TO '{parquet_file}'")
1927

2028

2129
if __name__ == "__main__":
2230
parser = argparse.ArgumentParser()
2331
parser.add_argument(
2432
"--tpcds-gen-folder",
2533
default="data/tpcds-dbgen",
34+
type=Path,
2635
help="Path to the folder containing the TPC-DS dsdgen tool and generated data",
2736
)
2837
parser.add_argument("--scale-factor", default=0.01, help="Scale factor to run on in GB", type=float)
29-
3038
args = parser.parse_args()
31-
num_parts = args.scale_factor
39+
40+
tpcds_gen_folder: Path = args.tpcds_gen_folder
41+
assert args.scale_factor > 0
3242

3343
logger.info(
3444
"Generating data at %s with: scale_factor=%s",
35-
args.tpcds_gen_folder,
45+
tpcds_gen_folder,
3646
args.scale_factor,
3747
)
3848

39-
gen_tpcds(basedir=args.tpcds_gen_folder, scale_factor=args.scale_factor)
49+
scaled_tpcds_gen_folder = tpcds_gen_folder / str(args.scale_factor)
50+
gen_tpcds(dir=scaled_tpcds_gen_folder, scale_factor=args.scale_factor)

benchmarking/tpcds/helpers.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from pathlib import Path
2+
3+
import daft
4+
from daft.sql.sql import SQLCatalog
5+
6+
7+
def generate_catalog(dir: Path):
8+
if not dir.exists():
9+
raise RuntimeError(f"Directory not found: {dir}")
10+
return SQLCatalog(
11+
tables={
12+
file.stem: daft.read_parquet(path=str(file))
13+
for file in dir.iterdir()
14+
if file.is_file() and file.suffix == ".parquet"
15+
}
16+
)
17+
18+
19+
def parse_questions_str(questions: str) -> list[int]:
20+
if questions == "*":
21+
return list(range(1, 100))
22+
23+
nums = set()
24+
for split in filter(lambda str: str, questions.split(",")):
25+
try:
26+
num = int(split)
27+
nums.add(num)
28+
except ValueError:
29+
ints = split.split("-")
30+
assert (
31+
len(ints) == 2
32+
), f"A range must include two numbers split by a dash (i.e., '-'); instead got '{split}'"
33+
[lower, upper] = ints
34+
try:
35+
lower = int(lower)
36+
upper = int(upper)
37+
assert lower <= upper
38+
for index in range(lower, upper + 1):
39+
nums.add(index)
40+
except ValueError:
41+
raise ValueError(f"Invalid range: {split}")
42+
43+
return nums

benchmarking/tpcds/ray_entrypoint.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import argparse
2+
import sys
3+
from pathlib import Path
4+
5+
import helpers
6+
7+
import daft
8+
9+
10+
def run(
11+
parquet_folder: Path,
12+
question: int,
13+
dry_run: bool,
14+
):
15+
catalog = helpers.generate_catalog(parquet_folder)
16+
query_file = Path(__file__).parent / "queries" / f"{question:02}.sql"
17+
with open(query_file) as f:
18+
query = f.read()
19+
20+
try:
21+
daft.sql(query, catalog=catalog).explain(show_all=True)
22+
if not dry_run:
23+
daft.sql(query, catalog=catalog).collect()
24+
except Exception as e:
25+
print(str(e), file=sys.stderr)
26+
27+
28+
if __name__ == "__main__":
29+
parser = argparse.ArgumentParser()
30+
parser.add_argument(
31+
"--tpcds-gen-folder",
32+
required=True,
33+
type=Path,
34+
help="Path to the TPC-DS data generation folder",
35+
)
36+
parser.add_argument(
37+
"--question",
38+
required=True,
39+
type=int,
40+
help="The TPC-DS question index to run",
41+
)
42+
parser.add_argument(
43+
"--dry-run",
44+
action="store_true",
45+
help="Whether or not to run the query in dry-run mode; if true, only the plan will be printed out",
46+
)
47+
args = parser.parse_args()
48+
49+
tpcds_gen_folder: Path = args.tpcds_gen_folder
50+
assert tpcds_gen_folder.exists()
51+
assert args.question in range(1, 100)
52+
53+
run(args.tpcds_gen_folder, args.question, args.dry_run)

0 commit comments

Comments
 (0)