Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions .github/ci-scripts/job_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import argparse
import asyncio
import json
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional

import duckdb
from ray.job_submission import JobStatus, JobSubmissionClient


def parse_env_var_str(env_var_str: str) -> dict:
iter = map(
lambda s: s.strip().split("="),
filter(lambda s: s, env_var_str.split(",")),
)
return {k: v for k, v in iter}


async def print_logs(logs):
async for lines in logs:
print(lines, end="")


async def wait_on_job(logs, timeout_s):
await asyncio.wait_for(print_logs(logs), timeout=timeout_s)


def generate_data():
datadir = Path(__file__).parents[2] / "gendata"
datadir.mkdir(parents=True, exist_ok=True)
scale_factor = 0.01
db = duckdb.connect(database=datadir / "tpcds.db")
db.sql(f"call dsdgen(sf = {scale_factor})")
for item in db.sql("show tables").fetchall():
tbl = item[0]
parquet_file = datadir / f"{tbl}.parquet"
print(f"Exporting {tbl} to {parquet_file}")
db.sql(f"COPY {tbl} TO '{parquet_file}'")


@dataclass
class Result:
query: int
duration: timedelta
error_msg: Optional[str]


def submit_job(
working_dir: Path,
entrypoint_script: str,
entrypoint_args: str,
env_vars: str,
enable_ray_tracing: bool,
):
generate_data()

env_vars_dict = parse_env_var_str(env_vars)
if enable_ray_tracing:
env_vars_dict["DAFT_ENABLE_RAY_TRACING"] = "1"

client = JobSubmissionClient(address="http://localhost:8265")

if entrypoint_args.startswith("[") and entrypoint_args.endswith("]"):
# this is a json-encoded list of strings; parse accordingly
list_of_entrypoint_args: list[str] = json.loads(entrypoint_args)
else:
list_of_entrypoint_args: list[str] = [entrypoint_args]

results = []

for index, args in enumerate(list_of_entrypoint_args):
entrypoint = f"DAFT_RUNNER=ray python {entrypoint_script} {args}"
print(f"{entrypoint=}")
start = datetime.now()
job_id = client.submit_job(
entrypoint=entrypoint,
runtime_env={
"working_dir": working_dir,
"env_vars": env_vars_dict,
},
)

asyncio.run(wait_on_job(client.tail_job_logs(job_id), timeout_s=60 * 30))

status = client.get_job_status(job_id)
assert status.is_terminal(), "Job should have terminated"
end = datetime.now()
duration = end - start
error_msg = None
if status != JobStatus.SUCCEEDED:
job_info = client.get_job_info(job_id)
error_msg = job_info.message

result = Result(query=index, duration=duration, error_msg=error_msg)
results.append(result)

print(f"{results=}")


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--working-dir", type=Path, required=True)
parser.add_argument("--entrypoint-script", type=str, required=True)
parser.add_argument("--entrypoint-args", type=str, required=True)
parser.add_argument("--env-vars", type=str, required=True)
parser.add_argument("--enable-ray-tracing", action="store_true")

args = parser.parse_args()

working_dir: Path = args.working_dir
assert working_dir.exists() and working_dir.is_dir(), "The working dir must exist and be directory"
entrypoint: Path = working_dir / args.entrypoint_script
assert entrypoint.exists() and entrypoint.is_file(), "The entrypoint script must exist and be a file"

submit_job(
working_dir=working_dir,
entrypoint_script=args.entrypoint_script,
entrypoint_args=args.entrypoint_args,
env_vars=args.env_vars,
enable_ray_tracing=args.enable_ray_tracing,
)
2 changes: 2 additions & 0 deletions .github/ci-scripts/templatize_ray_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,7 @@ class Metadata(BaseModel, extra="allow"):
if metadata:
metadata = Metadata(**metadata)
content = content.replace(OTHER_INSTALL_PLACEHOLDER, " ".join(metadata.dependencies))
else:
content = content.replace(OTHER_INSTALL_PLACEHOLDER, "")

print(content)
37 changes: 15 additions & 22 deletions .github/workflows/run-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,32 +71,23 @@ jobs:
run: |
uv v
source .venv/bin/activate
uv pip install ray[default] boto3
uv pip install ray[default] boto3 duckdb
- name: Dynamically update ray config file
run: |
source .venv/bin/activate
(cat .github/assets/template.yaml | \
uv run \
--python 3.12 \
.github/ci-scripts/templatize_ray_config.py \
--cluster-name "ray-ci-run-${{ github.run_id }}_${{ github.run_attempt }}" \
--daft-wheel-url '${{ inputs.daft_wheel_url }}' \
--daft-version '${{ inputs.daft_version }}' \
--python-version '${{ inputs.python_version }}' \
--cluster-profile '${{ inputs.cluster_profile }}' \
--working-dir '${{ inputs.working_dir }}' \
--entrypoint-script '${{ inputs.entrypoint_script }}'
--cluster-name="ray-ci-run-${{ github.run_id }}_${{ github.run_attempt }}" \
--daft-wheel-url='${{ inputs.daft_wheel_url }}' \
--daft-version='${{ inputs.daft_version }}' \
--python-version='${{ inputs.python_version }}' \
--cluster-profile='${{ inputs.cluster_profile }}' \
--working-dir='${{ inputs.working_dir }}' \
--entrypoint-script='${{ inputs.entrypoint_script }}'
) >> .github/assets/ray.yaml
cat .github/assets/ray.yaml
- name: Setup ray env vars
run: |
source .venv/bin/activate
ray_env_var=$(python .github/ci-scripts/format_env_vars.py \
--env-vars '${{ inputs.env_vars }}' \
--enable-ray-tracing \
)
echo $ray_env_var
echo "ray_env_var=$ray_env_var" >> $GITHUB_ENV
- name: Download private ssh key
run: |
KEY=$(aws secretsmanager get-secret-value --secret-id ci-github-actions-ray-cluster-key-3 --query SecretString --output text)
Expand All @@ -117,11 +108,13 @@ jobs:
echo 'Invalid command submitted; command cannot be empty'
exit 1
fi
ray job submit \
--working-dir ${{ inputs.working_dir }} \
--address http://localhost:8265 \
--runtime-env-json "$ray_env_var" \
-- python ${{ inputs.entrypoint_script }} ${{ inputs.entrypoint_args }}
rm -rf daft
python .github/ci-scripts/job_runner.py \
--working-dir='${{ inputs.working_dir }}' \
--entrypoint-script='${{ inputs.entrypoint_script }}' \
--entrypoint-args='${{ inputs.entrypoint_args }}' \
--env-vars='${{ inputs.env_vars }}' \
--enable-ray-tracing
- name: Download log files from ray cluster
run: |
source .venv/bin/activate
Expand Down
5 changes: 5 additions & 0 deletions benchmarking/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import daft

df = daft.from_pydict({"nums": [1, 2, 3]})

df.show()
6 changes: 6 additions & 0 deletions benchmarking/tpcds/ray_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ def run(
)
args = parser.parse_args()

d = Path(".")
assert d.exists()
for x in d.iterdir():
print(f"subpath: {x}")

tpcds_gen_folder: Path = args.tpcds_gen_folder
print(f"{tpcds_gen_folder=}")
assert tpcds_gen_folder.exists()
assert args.question in range(1, 100)

Expand Down
Loading