Skip to content

Commit

Permalink
[Ray] Use main pool as owner when autoscale disabled (#2878)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaokunyang committed Apr 9, 2022
1 parent cfb23ef commit dc93f88
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
11 changes: 0 additions & 11 deletions mars/services/storage/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,17 +571,6 @@ async def _setup_storage(
):
backend = get_storage_backend(storage_backend)
storage_config = storage_config or dict()

from ..cluster import ClusterAPI

if backend.name == "ray":
try:
cluster_api = await ClusterAPI.create(self.address)
supervisor_address = (await cluster_api.get_supervisors())[0]
# ray storage backend need to set supervisor as owner to avoid data lost when worker dies.
storage_config["owner"] = supervisor_address
except mo.ActorNotExist:
pass
init_params, teardown_params = await backend.setup(**storage_config)
client = backend(**init_params)
self._init_params[band_name][storage_backend] = init_params
Expand Down
28 changes: 25 additions & 3 deletions mars/services/storage/worker/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,31 @@ async def start(self):
backends = storage_configs.get("backends")
options = storage_configs.get("default_config", dict())
transfer_block_size = options.get("transfer_block_size", None)
backend_config = {
backend: storage_configs.get(backend, dict()) for backend in backends
}
backend_config = {}
for backend in backends:
storage_config = storage_configs.get(backend, dict())
backend_config[backend] = storage_config
if backend == "ray":
# Specify supervisor as ray owner will be costly when mars do shuffle which there will be m*n objects
# need to specify supervisor as owner, so enable it only for auto scale to avoid data lost when scale
# in. This limit can be removed when ray support ownership transfer.
if (
self._config.get("scheduling", {})
.get("autoscale", {})
.get("enabled", False)
):
try:
from ...cluster.api import ClusterAPI

cluster_api = await ClusterAPI.create(self._address)
supervisor_address = (await cluster_api.get_supervisors())[0]
# ray storage backend need to set supervisor as owner to avoid data lost when worker dies.
owner = supervisor_address
except mo.ActorNotExist:
owner = self._address
else:
owner = self._address
storage_config["owner"] = owner

await mo.create_actor(
StorageManagerActor,
Expand Down

0 comments on commit dc93f88

Please sign in to comment.