Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ray] Implements get_chunks_result for Ray execution context #3023

Merged
15 changes: 12 additions & 3 deletions .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest]
python-version: [3.8-kubernetes, 3.8-hadoop, 3.8-ray, 3.8-vineyard, 3.8-dask]
python-version: [3.8-kubernetes, 3.8-hadoop, 3.8-ray, 3.8-ray-dag, 3.8-vineyard, 3.8-dask]
include:
- { os: ubuntu-latest, python-version: 3.8-kubernetes, no-common-tests: 1,
no-deploy: 1, with-kubernetes: "with Kubernetes" }
Expand All @@ -28,6 +28,8 @@ jobs:
no-deploy: 1, with-vineyard: "with vineyard" }
- { os: ubuntu-latest, python-version: 3.8-ray, no-common-tests: 1,
no-deploy: 1, with-ray: "with ray" }
- { os: ubuntu-latest, python-version: 3.8-ray-dag, no-common-tests: 1,
no-deploy: 1, with-ray-dag: "with ray dag" }
- { os: ubuntu-latest, python-version: 3.8-dask, no-common-tests: 1,
no-deploy: 1, run-dask: "run dask" }

Expand All @@ -51,6 +53,7 @@ jobs:
WITH_KUBERNETES: ${{ matrix.with-kubernetes }}
WITH_VINEYARD: ${{ matrix.with-vineyard }}
WITH_RAY: ${{ matrix.with-ray }}
WITH_RAY_DAG: ${{ matrix.with-ray-dag }}
RUN_DASK: ${{ matrix.run-dask }}
NO_COMMON_TESTS: ${{ matrix.no-common-tests }}
shell: bash
Expand All @@ -67,7 +70,7 @@ jobs:
if [[ $UNAME == "windows" ]]; then
pip install virtualenv flaky
else
pip install virtualenv flaky ray
pip install virtualenv flaky
if [ -n "$WITH_KUBERNETES" ]; then
./.github/workflows/install-minikube.sh
pip install kubernetes
Expand All @@ -90,7 +93,7 @@ jobs:
sudo mv /tmp/etcd-download-test/etcdctl /usr/local/bin/
rm -fr /tmp/etcd-$ETCD_VER-linux-amd64.tar.gz /tmp/etcd-download-test
fi
if [ -n "$WITH_RAY" ]; then
if [ -n "$WITH_RAY" ] || [ -n "$WITH_RAY_DAG" ]; then
pip install ray[default]==1.9.2
pip install "xgboost_ray==0.1.5" "xgboost<1.6.0"
fi
Expand All @@ -107,6 +110,7 @@ jobs:
WITH_CYTHON: ${{ matrix.with-cython }}
WITH_VINEYARD: ${{ matrix.with-vineyard }}
WITH_RAY: ${{ matrix.with-ray }}
WITH_RAY_DAG: ${{ matrix.with-ray-dag }}
RUN_DASK: ${{ matrix.run-dask }}
NO_COMMON_TESTS: ${{ matrix.no-common-tests }}
NUMPY_EXPERIMENTAL_ARRAY_FUNCTION: 1
Expand Down Expand Up @@ -143,6 +147,11 @@ jobs:
pytest $PYTEST_CONFIG --durations=0 --timeout=600 -v -s -m ray
coverage report
fi
if [ -n "$WITH_RAY_DAG" ]; then
export MARS_CI_BACKEND=ray
pytest $PYTEST_CONFIG --durations=0 --timeout=600 -v -s -m ray_dag
coverage report
fi
if [ -n "$RUN_DASK" ]; then
pytest $PYTEST_CONFIG mars/contrib/dask/tests/test_dask.py
coverage report
Expand Down
15 changes: 13 additions & 2 deletions mars/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from mars.utils import lazy_import

ray = lazy_import("ray")
MARS_CI_BACKEND = os.environ.get("MARS_CI_BACKEND", "mars")


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -167,7 +168,11 @@ def _new_test_session(_stop_isolation):
from .deploy.oscar.tests.session import new_test_session

sess = new_test_session(
address="test://127.0.0.1", init_local=True, default=True, timeout=300
address="test://127.0.0.1",
backend=MARS_CI_BACKEND,
init_local=True,
default=True,
timeout=300,
)
with option_context({"show_progress": False}):
try:
Expand All @@ -181,7 +186,12 @@ def _new_integrated_test_session(_stop_isolation):
from .deploy.oscar.tests.session import new_test_session

sess = new_test_session(
address="127.0.0.1", init_local=True, n_worker=2, default=True, timeout=300
address="127.0.0.1",
backend=MARS_CI_BACKEND,
init_local=True,
n_worker=2,
default=True,
timeout=300,
)
with option_context({"show_progress": False}):
try:
Expand Down Expand Up @@ -213,6 +223,7 @@ def _new_gpu_test_session(_stop_isolation): # pragma: no cover

sess = new_test_session(
address="127.0.0.1",
backend=MARS_CI_BACKEND,
init_local=True,
n_worker=1,
n_cpu=1,
Expand Down
1 change: 1 addition & 0 deletions mars/dataframe/base/tests/test_base_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ def test_isin_execution(setup):
pd.testing.assert_frame_equal(result, expected)


@pytest.mark.ray_dag
def test_cut_execution(setup):
session = setup

Expand Down
25 changes: 22 additions & 3 deletions mars/services/task/execution/ray/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

import functools
import inspect
from typing import Union
import logging
from typing import Union, Dict, List

from .....core.context import Context
from .....utils import implements, lazy_import
from ....context import ThreadedServiceContext

ray = lazy_import("ray")
logger = logging.getLogger(__name__)


class RayRemoteObjectManager:
Expand Down Expand Up @@ -89,7 +91,14 @@ def _get_task_state_actor(self) -> "ray.actor.ActorHandle":
@implements(Context.create_remote_object)
def create_remote_object(self, name: str, object_cls, *args, **kwargs):
task_state_actor = self._get_task_state_actor()
task_state_actor.create_remote_object.remote(name, object_cls, *args, **kwargs)
r = task_state_actor.create_remote_object.remote(
name, object_cls, *args, **kwargs
)
# Make sure the actor is created. The remote object may not be created
# when get_remote_object from worker because the callers of
# create_remote_object and get_remote_object are not in the same worker.
# Use sync Ray actor requires this `ray.get`, too.
ray.get(r)
return _RayRemoteObjectWrapper(task_state_actor, name)

@implements(Context.get_remote_object)
Expand All @@ -107,7 +116,17 @@ def destroy_remote_object(self, name: str):
class RayExecutionContext(_RayRemoteObjectContext, ThreadedServiceContext):
"""The context for tiling."""

pass
def __init__(self, task_context: Dict, *args, **kwargs):
super().__init__(*args, **kwargs)
self._task_context = task_context

@implements(Context.get_chunks_result)
def get_chunks_result(self, data_keys: List[str]) -> List:
logger.info("Getting %s chunks result.", len(data_keys))
object_refs = [self._task_context[key] for key in data_keys]
result = ray.get(object_refs)
logger.info("Got %s chunks result.", len(result))
return result


# TODO(fyrestone): Implement more APIs for Ray.
Expand Down
77 changes: 51 additions & 26 deletions mars/services/task/execution/ray/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import asyncio
import functools
import logging
from typing import List, Dict, Any, Set
from .....core import ChunkGraph, Chunk, TileContext
Expand Down Expand Up @@ -113,22 +114,22 @@ def __init__(
config: ExecutionConfig,
task: Task,
tile_context: TileContext,
ray_executor: "ray.remote_function.RemoteFunction",
task_context: Dict[str, "ray.ObjectRef"],
task_state_actor: "ray.actor.ActorHandle",
lifecycle_api: LifecycleAPI,
meta_api: MetaAPI,
):
self._config = config
self._task = task
self._tile_context = tile_context
self._ray_executor = ray_executor
self._task_context = task_context
self._task_state_actor = task_state_actor
self._ray_executor = self._get_ray_executor()

# api
self._lifecycle_api = lifecycle_api
self._meta_api = meta_api

self._task_context = {}
self._available_band_resources = None

# For progress
Expand All @@ -148,19 +149,19 @@ async def create(
tile_context: TileContext,
**kwargs,
) -> "TaskExecutor":
ray_executor = ray.remote(execute_subtask)
lifecycle_api, meta_api = await cls._get_apis(session_id, address)
task_state_actor = (
ray.remote(RayTaskState)
.options(name=RayTaskState.gen_name(task.task_id))
.remote()
)
await cls._init_context(task_state_actor, session_id, address)
task_context = {}
await cls._init_context(task_context, task_state_actor, session_id, address)
return cls(
config,
task,
tile_context,
ray_executor,
task_context,
task_state_actor,
lifecycle_api,
meta_api,
Expand All @@ -174,13 +175,29 @@ async def _get_apis(cls, session_id: str, address: str):
MetaAPI.create(session_id, address),
)

@staticmethod
@functools.lru_cache(maxsize=1)
def _get_ray_executor():
# Export remote function once.
return ray.remote(execute_subtask)

@classmethod
async def _init_context(
cls, task_state_actor: "ray.actor.ActorHandle", session_id: str, address: str
cls,
task_context: Dict[str, "ray.ObjectRef"],
task_state_actor: "ray.actor.ActorHandle",
session_id: str,
address: str,
):
loop = asyncio.get_running_loop()
context = RayExecutionContext(
task_state_actor, session_id, address, address, address, loop=loop
task_context,
task_state_actor,
session_id,
address,
address,
address,
loop=loop,
)
await context.init()
set_context(context)
Expand All @@ -194,7 +211,7 @@ async def execute_subtask_graph(
context: Any = None,
) -> Dict[Chunk, ExecutionChunkResult]:
logger.info("Stage %s start.", stage_id)
context = self._task_context
task_context = self._task_context
output_meta_object_refs = []
self._pre_all_stages_tile_progress = (
self._pre_all_stages_tile_progress + self._cur_stage_tile_progress
Expand All @@ -211,7 +228,7 @@ async def execute_subtask_graph(
for subtask in subtask_graph.topological_iter():
subtask_chunk_graph = subtask.chunk_graph
key_to_input = await self._load_subtask_inputs(
stage_id, subtask, subtask_chunk_graph, context
stage_id, subtask, subtask_chunk_graph, task_context
)
output_keys = self._get_subtask_output_keys(subtask_chunk_graph)
output_meta_keys = result_meta_keys & output_keys
Expand All @@ -235,32 +252,34 @@ async def execute_subtask_graph(
meta_object_ref, *output_object_refs = output_object_refs
# TODO(fyrestone): Fetch(not get) meta object here.
output_meta_object_refs.append(meta_object_ref)
context.update(zip(output_keys, output_object_refs))
task_context.update(zip(output_keys, output_object_refs))
logger.info("Submitted %s subtasks of stage %s.", len(subtask_graph), stage_id)

key_to_meta = {}
if len(output_meta_object_refs) > 0:
# TODO(fyrestone): Optimize update meta by fetching partial meta.
meta_count = len(output_meta_object_refs)
logger.info("Getting %s metas of stage %s.", meta_count, stage_id)
meta_list = await asyncio.gather(*output_meta_object_refs)
for meta in meta_list:
key_to_meta.update(meta)
assert len(key_to_meta) == len(result_meta_keys)
logger.info(
"Got %s metas of stage %s.", len(output_meta_object_refs), stage_id
)
logger.info("Got %s metas of stage %s.", meta_count, stage_id)

chunk_to_meta = {}
output_object_refs = []
# ray.wait requires the object ref list is unique.
output_object_refs = set()
for chunk in chunk_graph.result_chunks:
chunk_key = chunk.key
object_ref = context[chunk_key]
output_object_refs.append(object_ref)
object_ref = task_context[chunk_key]
output_object_refs.add(object_ref)
chunk_meta = key_to_meta.get(chunk_key)
if chunk_meta is not None:
chunk_to_meta[chunk] = ExecutionChunkResult(chunk_meta, object_ref)

logger.info("Waiting for stage %s complete.", stage_id)
ray.wait(output_object_refs, fetch_local=False)
# Patched the asyncio.to_thread for Python < 3.9 at mars/lib/aio/__init__.py
await asyncio.to_thread(ray.wait, list(output_object_refs), fetch_local=False)
# Just use `self._cur_stage_tile_progress` as current stage progress
# because current stage is finished, its progress is 1.
self._pre_all_stages_progress += self._cur_stage_tile_progress
Expand All @@ -279,14 +298,20 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
chunk_keys = []
for chunk in self._tile_context[tileable].chunks:
chunk_keys.append(chunk.key)
object_ref = self._task_context[chunk.key]
update_metas.append(
self._meta_api.set_chunk_meta.delay(
chunk,
bands=[],
object_ref=object_ref,
if chunk.key in self._task_context:
# Some tileable graph may have result chunks that not be executed,
# for example:
# r, b = cut(series, bins, retbins=True)
# r_result = r.execute().fetch()
# b_result = b.execute().fetch() <- This is the case
object_ref = self._task_context[chunk.key]
update_metas.append(
self._meta_api.set_chunk_meta.delay(
chunk,
bands=[],
object_ref=object_ref,
)
)
)
update_lifecycles.append(
self._lifecycle_api.track.delay(tileable.key, chunk_keys)
)
Expand Down Expand Up @@ -315,7 +340,7 @@ async def get_progress(self) -> float:
finished_objects, _ = ray.wait(
self._cur_stage_output_object_refs,
num_returns=total,
timeout=0.1,
timeout=0, # Avoid blocking the asyncio loop.
fetch_local=False,
)
stage_progress = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@

from ......core.graph import TileableGraph, TileableGraphBuilder, ChunkGraphBuilder
from ......serialization import serialize
from ......tests.core import require_ray
from ......tests.core import require_ray, mock
from ......utils import lazy_import, get_chunk_params
from .....context import ThreadedServiceContext
from ....core import new_task_id
from ..context import RayRemoteObjectManager, _RayRemoteObjectContext
from ..context import (
RayExecutionContext,
RayRemoteObjectManager,
_RayRemoteObjectContext,
)
from ..executor import execute_subtask
from ..fetcher import RayFetcher

Expand Down Expand Up @@ -119,3 +124,27 @@ async def bar(self, a, b):
context.destroy_remote_object(name)
with pytest.raises(KeyError):
remote_object.foo(3, 4)

class MyException(Exception):
pass

class _ErrorRemoteObject:
def __init__(self):
raise MyException()

with pytest.raises(MyException):
context.create_remote_object(name, _ErrorRemoteObject)


@require_ray
def test_get_chunks_result(ray_start_regular_shared2):
value = 123
o = ray.put(value)

def fake_init(self):
pass

with mock.patch.object(ThreadedServiceContext, "__init__", new=fake_init):
context = RayExecutionContext({"abc": o}, None)
r = context.get_chunks_result(["abc"])
assert r == [value]
Loading