Skip to content

Commit

Permalink
FEAT: force to specify worker ip and gpu idx when launching models (x…
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengjieLi28 authored and qinxuye committed Apr 2, 2024
1 parent d896f84 commit b10c98b
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 23 deletions.
6 changes: 6 additions & 0 deletions xinference/api/restful_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,8 @@ async def launch_model(
peft_model_path = payload.get("peft_model_path", None)
image_lora_load_kwargs = payload.get("image_lora_load_kwargs", None)
image_lora_fuse_kwargs = payload.get("image_lora_fuse_kwargs", None)
worker_ip = payload.get("worker_ip", None)
gpu_idx = payload.get("gpu_idx", None)

exclude_keys = {
"model_uid",
Expand All @@ -707,6 +709,8 @@ async def launch_model(
"peft_model_path",
"image_lora_load_kwargs",
"image_lora_fuse_kwargs",
"worker_ip",
"gpu_idx",
}

kwargs = {
Expand Down Expand Up @@ -734,6 +738,8 @@ async def launch_model(
peft_model_path=peft_model_path,
image_lora_load_kwargs=image_lora_load_kwargs,
image_lora_fuse_kwargs=image_lora_fuse_kwargs,
worker_ip=worker_ip,
gpu_idx=gpu_idx,
**kwargs,
)

Expand Down
8 changes: 8 additions & 0 deletions xinference/client/restful/restful_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,8 @@ def launch_model(
peft_model_path: Optional[str] = None,
image_lora_load_kwargs: Optional[Dict] = None,
image_lora_fuse_kwargs: Optional[Dict] = None,
worker_ip: Optional[str] = None,
gpu_idx: Optional[Union[int, List[int]]] = None,
**kwargs,
) -> str:
"""
Expand Down Expand Up @@ -828,6 +830,10 @@ def launch_model(
lora load parameters for image model
image_lora_fuse_kwargs: Optional[Dict]
lora fuse parameters for image model
worker_ip: Optional[str]
Specify the worker ip where the model is located in a distributed scenario.
gpu_idx: Optional[Union[int, List[int]]]
Specify the GPU index where the model is located.
**kwargs:
Any other parameters been specified.
Expand All @@ -853,6 +859,8 @@ def launch_model(
"peft_model_path": peft_model_path,
"image_lora_load_kwargs": image_lora_load_kwargs,
"image_lora_fuse_kwargs": image_lora_fuse_kwargs,
"worker_ip": worker_ip,
"gpu_idx": gpu_idx,
}

for key, value in kwargs.items():
Expand Down
33 changes: 32 additions & 1 deletion xinference/core/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ def __init__(self):
def uid(cls) -> str:
return "supervisor"

def _get_worker_ref_by_ip(
self, ip: str
) -> Optional[xo.ActorRefType["WorkerActor"]]:
for addr, ref in self._worker_address_to_worker.items():
existing_ip = addr.split(":")[0]
if existing_ip == ip:
return ref
return None

async def __post_create__(self):
self._uptime = time.time()
if not XINFERENCE_DISABLE_HEALTH_CHECK:
Expand Down Expand Up @@ -717,8 +726,25 @@ async def launch_builtin_model(
peft_model_path: Optional[str] = None,
image_lora_load_kwargs: Optional[Dict] = None,
image_lora_fuse_kwargs: Optional[Dict] = None,
worker_ip: Optional[str] = None,
gpu_idx: Optional[Union[int, List[int]]] = None,
**kwargs,
) -> str:
target_ip_worker_ref = (
self._get_worker_ref_by_ip(worker_ip) if worker_ip is not None else None
)
if (
worker_ip is not None
and not self.is_local_deployment()
and target_ip_worker_ref is None
):
raise ValueError(f"Worker ip address {worker_ip} is not in the cluster.")
if worker_ip is not None and self.is_local_deployment():
logger.warning(
f"You specified the worker ip: {worker_ip} in local mode, "
f"xinference will ignore this option."
)

if model_uid is None:
model_uid = self._gen_model_uid(model_name)

Expand All @@ -735,7 +761,11 @@ async def _launch_one_model(_replica_model_uid):
)

nonlocal model_type
worker_ref = await self._choose_worker()
worker_ref = (
target_ip_worker_ref
if target_ip_worker_ref is not None
else await self._choose_worker()
)
# LLM as default for compatibility
model_type = model_type or "LLM"
await worker_ref.launch_builtin_model(
Expand All @@ -750,6 +780,7 @@ async def _launch_one_model(_replica_model_uid):
peft_model_path=peft_model_path,
image_lora_load_kwargs=image_lora_load_kwargs,
image_lora_fuse_kwargs=image_lora_fuse_kwargs,
gpu_idx=gpu_idx,
**kwargs,
)
self._replica_model_uid_to_worker[_replica_model_uid] = worker_ref
Expand Down
187 changes: 185 additions & 2 deletions xinference/core/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional
from typing import List, Optional, Union

import pytest
import pytest_asyncio
Expand Down Expand Up @@ -42,7 +42,14 @@ def get_gpu_to_model_uid(self):
def get_gpu_to_embedding_model_uids(self):
return self._gpu_to_embedding_model_uids

def get_user_specified_gpu_to_model_uids(self):
return self._user_specified_gpu_to_model_uids

async def is_model_vllm_backend(self, model_uid):
if model_uid.startswith("normal_"):
return False
if model_uid.startswith("vllm_"):
return True
for _dev in self._gpu_to_model_uid:
if model_uid == self._gpu_to_model_uid[_dev]:
return True
Expand All @@ -57,10 +64,11 @@ async def launch_builtin_model(
quantization: Optional[str],
model_type: str = "LLM",
n_gpu: Optional[int] = None,
gpu_idx: Optional[Union[int, List[int]]] = None,
**kwargs,
):
subpool_address, devices = await self._create_subpool(
model_uid, model_type, n_gpu=n_gpu
model_uid, model_type, n_gpu=n_gpu, gpu_idx=gpu_idx # type: ignore
)
self._model_uid_to_addr[model_uid] = subpool_address

Expand Down Expand Up @@ -252,3 +260,178 @@ async def test_launch_embedding_model(setup_pool):
)
for i in range(1, 6):
await worker.terminate_model(f"model_model_{i}")


@pytest.mark.asyncio
async def test_launch_model_with_gpu_idx(setup_pool):
pool = setup_pool
addr = pool.external_address

worker: xo.ActorRefType["MockWorkerActor"] = await xo.create_actor(
MockWorkerActor,
address=addr,
uid=WorkerActor.uid(),
supervisor_address="test",
main_pool=pool,
cuda_devices=[i for i in range(4)],
)

# test normal model
await worker.launch_builtin_model(
"normal_model_model_1", "mock_model_name", None, None, None, "LLM", n_gpu=1
)
llm_info = await worker.get_gpu_to_model_uid()
assert len(llm_info) == 1
assert 0 in llm_info

await worker.launch_builtin_model(
"model_model_2", "mock_model_name", None, None, None, "LLM", gpu_idx=[0]
)
llm_info = await worker.get_gpu_to_model_uid()
assert len(llm_info) == 1
assert 0 in llm_info

user_specified_info = await worker.get_user_specified_gpu_to_model_uids()
assert len(user_specified_info) == 1
assert 0 in user_specified_info
assert len(user_specified_info[0]) == 1
assert list(user_specified_info[0])[0][0] == "model_model_2"
assert list(user_specified_info[0])[0][1] == "LLM"

# test vllm model
await worker.launch_builtin_model(
"vllm_model_model_3", "mock_model_name", None, None, None, "LLM", n_gpu=1
)
llm_info = await worker.get_gpu_to_model_uid()
assert len(llm_info) == 2
assert 0 in llm_info
assert 1 in llm_info

with pytest.raises(RuntimeError):
await worker.launch_builtin_model(
"model_model_4", "mock_model_name", None, None, None, "LLM", gpu_idx=[1]
)

await worker.launch_builtin_model(
"model_model_4", "mock_model_name", None, None, None, "LLM", gpu_idx=[2]
)
llm_info = await worker.get_gpu_to_model_uid()
assert len(llm_info) == 2
assert 0 in llm_info
assert 1 in llm_info

user_specified_info = await worker.get_user_specified_gpu_to_model_uids()
assert len(user_specified_info) == 2
assert 0 in user_specified_info
assert 2 in user_specified_info
assert len(user_specified_info[2]) == 1
assert list(user_specified_info[2])[0][0] == "model_model_4"
assert list(user_specified_info[2])[0][1] == "LLM"

# then launch a LLM without gpu_idx
await worker.launch_builtin_model(
"normal_model_model_5", "mock_model_name", None, None, None, "LLM", n_gpu=1
)
llm_info = await worker.get_gpu_to_model_uid()
assert len(llm_info) == 3
assert 0 in llm_info
assert 1 in llm_info
assert 3 in llm_info

# launch without gpu_idx again, error
with pytest.raises(RuntimeError):
await worker.launch_builtin_model(
"normal_model_model_6", "mock_model_name", None, None, None, "LLM", n_gpu=1
)

# test terminate and cleanup
await worker.terminate_model("normal_model_model_1")
await worker.terminate_model("model_model_2")
await worker.terminate_model("vllm_model_model_3")
await worker.terminate_model("model_model_4")
await worker.terminate_model("normal_model_model_5")

llm_info = await worker.get_gpu_to_model_uid()
assert len(llm_info) == 0

user_specified_info = await worker.get_user_specified_gpu_to_model_uids()
for idx, model_infos in user_specified_info.items():
assert len(model_infos) == 0

# next, test with embedding models
await worker.launch_builtin_model(
"embedding_1", "mock_model_name", None, None, None, "embedding", n_gpu=1
)
embedding_info = await worker.get_gpu_to_embedding_model_uids()
assert len(embedding_info) == 1
assert 0 in embedding_info

await worker.launch_builtin_model(
"vllm_mock_model_2", "mock_model_name", None, None, None, "LLM", gpu_idx=[0]
)
embedding_info = await worker.get_gpu_to_embedding_model_uids()
assert len(embedding_info) == 1
assert 0 in embedding_info

user_specified_info = await worker.get_user_specified_gpu_to_model_uids()
assert len(user_specified_info[0]) == 1
assert list(user_specified_info[0])[0][0] == "vllm_mock_model_2"
assert list(user_specified_info[0])[0][1] == "LLM"

# already has vllm model on gpu 0, error
with pytest.raises(RuntimeError):
await worker.launch_builtin_model(
"rerank_3", "mock_model_name", None, None, None, "rerank", gpu_idx=[0]
)
# never choose gpu 0 again
with pytest.raises(RuntimeError):
await worker.launch_builtin_model(
"normal_mock_model_3", "mock_model_name", None, None, None, "LLM", n_gpu=4
)

# should be on gpu 1
await worker.launch_builtin_model(
"embedding_3", "mock_model_name", None, None, None, "embedding", n_gpu=1
)
# should be on gpu 1
await worker.launch_builtin_model(
"rerank_4", "mock_model_name", None, None, None, "rerank", gpu_idx=[1]
)
# should be on gpu 2
await worker.launch_builtin_model(
"embedding_5", "mock_model_name", None, None, None, "embedding", n_gpu=1
)
# should be on gpu 3
await worker.launch_builtin_model(
"rerank_6", "mock_model_name", None, None, None, "rerank", n_gpu=1
)
# should be on gpu 2, due to there are the fewest models on it
await worker.launch_builtin_model(
"rerank_7", "mock_model_name", None, None, None, "rerank", n_gpu=1
)
embedding_info = await worker.get_gpu_to_embedding_model_uids()
user_specified_info = await worker.get_user_specified_gpu_to_model_uids()
assert "rerank_7" in embedding_info[2]
assert len(embedding_info[0]) == 1
assert len(user_specified_info[0]) == 1
assert len(embedding_info[1]) == 1
assert len(user_specified_info[1]) == 1
assert len(embedding_info[2]) == 2
assert len(user_specified_info[2]) == 0
assert len(embedding_info[3]) == 1
assert len(user_specified_info[3]) == 0

# cleanup
await worker.terminate_model("embedding_1")
await worker.terminate_model("vllm_mock_model_2")
await worker.terminate_model("embedding_3")
await worker.terminate_model("rerank_4")
await worker.terminate_model("embedding_5")
await worker.terminate_model("rerank_6")
await worker.terminate_model("rerank_7")

embedding_info = await worker.get_gpu_to_embedding_model_uids()
user_specified_info = await worker.get_user_specified_gpu_to_model_uids()
for info in [embedding_info, user_specified_info]:
for dev, details in info.items():
assert len(details) == 0
Loading

0 comments on commit b10c98b

Please sign in to comment.