Skip to content

Commit

Permalink
fix rpyc tcp delay (#645)
Browse files Browse the repository at this point in the history
  • Loading branch information
hiworldwzj authored Dec 4, 2024
1 parent 3928f5b commit a1d0e47
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
4 changes: 4 additions & 0 deletions lightllm/server/router/model_infer/model_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ async def get_max_total_token_num(self):


def _init_env(args, port, info_queue, mem_queue, router_lock):
import lightllm.utils.rpyc_fix_utils as _

# 注册graceful 退出的处理
from lightllm.utils.graceful_utils import graceful_registry
import inspect
Expand All @@ -263,6 +265,8 @@ def _init_env(args, port, info_queue, mem_queue, router_lock):


async def start_model_process(args, port, world_size, info_queue: mp.Queue, mem_queue: mp.Queue, router_lock: mp.Queue):
import lightllm.utils.rpyc_fix_utils as _

# 单卡时不使用 rpc
if world_size == 1:
return ModelRpcClient(ModelRpcServer(args, info_queue, mem_queue), world_size)
Expand Down
45 changes: 45 additions & 0 deletions lightllm/utils/rpyc_fix_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import sys
import errno
import socket
from rpyc.lib.compat import get_exc_errno
from rpyc.core.stream import SocketStream
from rpyc.utils.server import Server


def fix_connect(cls, host, port, **kwargs):
if kwargs.pop("ipv6", False):
kwargs["family"] = socket.AF_INET6
kwargs["nodelay"] = True
return cls(cls._connect(host, port, **kwargs))


SocketStream.connect = classmethod(fix_connect)


def fix_accept(self):
"""accepts an incoming socket connection (blocking)"""
while self.active:
try:
sock, addrinfo = self.listener.accept()
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except socket.timeout:
pass
except socket.error:
ex = sys.exc_info()[1]
if get_exc_errno(ex) in (errno.EINTR, errno.EAGAIN):
pass
else:
raise EOFError()
else:
break

if not self.active:
return

sock.setblocking(True)
self.logger.info(f"accepted {addrinfo} with fd {sock.fileno()}")
self.clients.add(sock)
self._accept_method(sock)


Server.accept = fix_accept

0 comments on commit a1d0e47

Please sign in to comment.