Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 54 additions & 35 deletions release/ray_release/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,42 +421,61 @@ def run_release_test_kuberay(
smoke_test: bool = False,
test_definition_root: Optional[str] = None,
) -> Result:
result.stable = test.get("stable", True)
result.smoke_test = smoke_test
cluster_compute = load_test_cluster_compute(test, test_definition_root)
kuberay_compute_config = convert_cluster_compute_to_kuberay_compute_config(
cluster_compute
)
kuberay_autoscaler_version = cluster_compute.get("autoscaler_version", None)
if kuberay_autoscaler_version:
kuberay_autoscaler_config = {"version": kuberay_autoscaler_version}
else:
kuberay_autoscaler_config = None
working_dir_upload_path = upload_working_dir(get_working_dir(test))
start_time = time.monotonic()
pipeline_exception = None
try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this wrapping does not really feel right.. to start, catching all Exception is not the right thing to do in most cases. could you explain what exactly you are trying to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using try/catch here mostly to retain the Exception that can be thrown any time commands run inside try. run_release_test_anyscale also implements the same thing. The purpose (I think) is to throw the right error code that matches with whatever errored out during the process (thus why it needs to retain the right exception msg)

result.stable = test.get("stable", True)
result.smoke_test = smoke_test
cluster_compute = load_test_cluster_compute(test, test_definition_root)
kuberay_compute_config = convert_cluster_compute_to_kuberay_compute_config(
cluster_compute
)
kuberay_autoscaler_version = cluster_compute.get("autoscaler_version", None)
if kuberay_autoscaler_version:
kuberay_autoscaler_config = {"version": kuberay_autoscaler_version}
else:
kuberay_autoscaler_config = None
working_dir_upload_path = upload_working_dir(get_working_dir(test))

command_timeout = int(test["run"].get("timeout", DEFAULT_COMMAND_TIMEOUT))
test_name_hash = hashlib.sha256(test["name"].encode()).hexdigest()[:10]
# random 8 digit suffix
random_suffix = "".join(random.choices(string.digits, k=8))
base_job_name = f"{test['name'][:20]}-{test_name_hash}-{random_suffix}"
job_name = base_job_name.replace("_", "-")
logger.info(f"Job name: {job_name}")
kuberay_job_manager = KubeRayJobManager()
retcode, duration = kuberay_job_manager.run_and_wait(
job_name=job_name,
image=test.get_anyscale_byod_image(),
cmd_to_run=test["run"]["script"],
env_vars=test.get_byod_runtime_env(),
working_dir=working_dir_upload_path,
pip=test.get_byod_pips(),
compute_config=kuberay_compute_config,
autoscaler_config=kuberay_autoscaler_config,
timeout=command_timeout,
)
kuberay_job_manager.fetch_results()
result.return_code = retcode
result.runtime = duration
except Exception as e:
logger.info(f"Exception: {e}")
pipeline_exception = e
result.runtime = time.monotonic() - start_time

command_timeout = int(test["run"].get("timeout", DEFAULT_COMMAND_TIMEOUT))
test_name_hash = hashlib.sha256(test["name"].encode()).hexdigest()[:10]
# random 8 digit suffix
random_suffix = "".join(random.choices(string.digits, k=8))
job_name = f"{test['name'][:20]}-{test_name_hash}-{random_suffix}".replace("_", "-")
logger.info(f"Job name: {job_name}")
logger.info(f"KubeRay compute config: {kuberay_compute_config}")
logger.info(f"KubeRay autoscaler config: {kuberay_autoscaler_config}")
kuberay_job_manager = KubeRayJobManager()
retcode, duration = kuberay_job_manager.run_and_wait(
job_name=job_name,
image=test.get_anyscale_byod_image(),
cmd_to_run=test["run"]["script"],
env_vars=test.get_byod_runtime_env(),
working_dir=working_dir_upload_path,
pip=test.get_byod_pips(),
compute_config=kuberay_compute_config,
autoscaler_config=kuberay_autoscaler_config,
timeout=command_timeout,
)
kuberay_job_manager.fetch_results()
result.return_code = retcode
result.runtime = duration
if pipeline_exception:
buildkite_group(":rotating_light: Handling errors")
exit_code, result_status, runtime = handle_exception(
pipeline_exception,
result.runtime,
)

result.return_code = exit_code.value
result.status = result_status.value
if runtime is not None:
result.runtime = runtime
raise pipeline_exception
return result


Expand Down
64 changes: 54 additions & 10 deletions release/ray_release/tests/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ def setUp(self) -> None:
self.sdk.returns["get_cloud"] = APIDict(result=APIDict(provider="AWS"))

self.writeClusterEnv("{'env': true}")
self.writeClusterCompute("{'compute': true}")
self.writeClusterCompute(
"{'head_node_type': {'name': 'head_node', 'instance_type': 'm5a.4xlarge'}, 'worker_node_types': []}"
)

with open(os.path.join(self.tempdir, "driver_fail.sh"), "wt") as f:
f.write("exit 1\n")
Expand Down Expand Up @@ -175,6 +177,23 @@ def mock_alerter(test: Test, result: Result):
),
alert="unit_test_alerter",
)
self.kuberay_test = MockTest(
name="unit_test_end_to_end_kuberay",
run=dict(
type="unit_test",
prepare="prepare_cmd",
script="test_cmd",
wait_for_nodes=dict(num_nodes=4, timeout=40),
),
working_dir=self.tempdir,
cluster=dict(
cluster_env="cluster_env.yaml",
cluster_compute="cluster_compute.yaml",
byod={},
),
env="kuberay",
alert="unit_test_alerter",
)
self.anyscale_project = "prj_unit12345678"

def tearDown(self) -> None:
Expand Down Expand Up @@ -237,42 +256,67 @@ def _succeed_until(self, until: str):

self.mock_alert_return = None

def _run(self, result: Result, **kwargs):
run_release_test(
test=self.test,
anyscale_project=self.anyscale_project,
result=result,
log_streaming_limit=1000,
**kwargs
)
def _run(self, result: Result, kuberay: bool = False, **kwargs):
if kuberay:
run_release_test(
test=self.kuberay_test,
result=result,
log_streaming_limit=1000,
**kwargs
)
else:
run_release_test(
test=self.test,
anyscale_project=self.anyscale_project,
result=result,
log_streaming_limit=1000,
**kwargs
)

def testInvalidClusterCompute(self):
result = Result()

# Test with regular run
with patch(
"ray_release.glue.load_test_cluster_compute",
_fail_on_call(ReleaseTestConfigError),
), self.assertRaises(ReleaseTestConfigError):
self._run(result)
self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)

# Test with kuberay run
with patch(
"ray_release.glue.load_test_cluster_compute",
_fail_on_call(ReleaseTestConfigError),
), self.assertRaises(ReleaseTestConfigError):
self._run(result, True)
self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)

# Fails because file not found
os.unlink(os.path.join(self.tempdir, "cluster_compute.yaml"))
with self.assertRaisesRegex(ReleaseTestConfigError, "Path not found"):
self._run(result)
self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)
with self.assertRaisesRegex(ReleaseTestConfigError, "Path not found"):
self._run(result, True)
self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)

# Fails because invalid jinja template
self.writeClusterCompute("{{ INVALID")
with self.assertRaisesRegex(ReleaseTestConfigError, "yaml template"):
self._run(result)
self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)
with self.assertRaisesRegex(ReleaseTestConfigError, "yaml template"):
self._run(result, True)
self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)

# Fails because invalid json
self.writeClusterCompute("{'test': true, 'fail}")
with self.assertRaisesRegex(ReleaseTestConfigError, "quoted scalar"):
self._run(result)

self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)
with self.assertRaisesRegex(ReleaseTestConfigError, "quoted scalar"):
self._run(result, True)
self.assertEqual(result.return_code, ExitCode.CONFIG_ERROR.value)

def testStartClusterFails(self):
Expand Down