Skip to content

Commit 618ccdb

Browse files
rainyflyEmmonsCurseJiang-Jia-Jun
authored
[Feature] Support mixed deployment with yiyan adapter in develop (#3976)
* [Feature] Support mixed deployment with yiyan adapter in release2.2 * fix metrics * add unit test * add unit test * add unit test * fix ci * fix for eb5 * fix ci * fix ci * fix ci --------- Co-authored-by: YuBaoku <[email protected]> Co-authored-by: Jiang-Jia-Jun <[email protected]>
1 parent 2745f37 commit 618ccdb

File tree

14 files changed

+934
-176
lines changed

14 files changed

+934
-176
lines changed

.github/workflows/_pre_ce_test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ jobs:
8282
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
8383
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
8484
FD_CACHE_QUEUE_PORT=$((42098 + DEVICE_PORT * 100))
85+
FD_ZMQ_RECV_REQUEST_SERVER_PORT=$((42048 + DEVICE_PORT * 100))
86+
FD_ZMQ_SEND_RESPONSE_SERVER_PORT=$((42038 + DEVICE_PORT * 100))
87+
FD_ZMQ_CONTROL_CMD_SERVER_PORTS=$((42028 + DEVICE_PORT * 100))
8588
echo "Test ENV Parameter:"
8689
echo "========================================================="
8790
echo "FLASK_PORT=${FLASK_PORT}"

fastdeploy/engine/common_engine.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@
3737
EngineCacheQueue,
3838
EngineWorkerQueue,
3939
IPCSignal,
40-
ZmqClient,
40+
ZmqIpcServer,
41+
ZmqTcpServer,
4142
)
4243
from fastdeploy.metrics.metrics import main_process_metrics
4344
from fastdeploy.metrics.trace_util import start_span, start_span_request
4445
from fastdeploy.model_executor.guided_decoding import schema_checker
4546
from fastdeploy.plugins.token_processor import load_token_processor_plugins
47+
from fastdeploy.splitwise.internal_adapter_utils import InternalAdapter
4648
from fastdeploy.splitwise.splitwise_connector import SplitwiseConnector
4749
from fastdeploy.utils import EngineError, envs, llm_logger
4850

@@ -576,9 +578,19 @@ def start_zmq_service(self, api_server_pid=None):
576578
if api_server_pid is None:
577579
return
578580
self.api_server_pid = api_server_pid
579-
self.zmq_server = ZmqClient(name=api_server_pid, mode=zmq.PULL)
580-
self.zmq_server.start_server()
581-
self.zmq_server.create_router()
581+
if envs.FD_ENABLE_INTERNAL_ADAPTER:
582+
self.recv_request_server = ZmqTcpServer(port=envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT, mode=zmq.PULL)
583+
self.send_response_server = ZmqTcpServer(port=envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT, mode=zmq.ROUTER)
584+
self.internal_adapter = InternalAdapter(
585+
cfg=self.cfg, engine=self, dp_rank=self.cfg.node_rank * self.cfg.worker_num_per_node
586+
)
587+
else:
588+
self.recv_request_server = ZmqIpcServer(name=api_server_pid, mode=zmq.PULL)
589+
self.send_response_server = ZmqIpcServer(name=api_server_pid, mode=zmq.ROUTER)
590+
self.recv_result_handle_thread = threading.Thread(
591+
target=self.send_response_server.recv_result_handle, daemon=True
592+
)
593+
self.recv_result_handle_thread.start()
582594
time.sleep(3)
583595
self.insert_task_to_scheduler_thread = threading.Thread(target=self._insert_zmq_task_to_scheduler, daemon=True)
584596
self.insert_task_to_scheduler_thread.start()
@@ -592,9 +604,9 @@ def _insert_zmq_task_to_scheduler(self):
592604
try:
593605
block = True if len(added_requests) == 0 else False
594606
if not self.cfg.model_config.enable_mm:
595-
err, data = self.zmq_server.receive_json_once(block)
607+
err, data = self.recv_request_server.receive_json_once(block)
596608
else:
597-
err, data = self.zmq_server.receive_pyobj_once(block)
609+
err, data = self.recv_request_server.receive_pyobj_once(block)
598610
if err is not None:
599611
llm_logger.error(f"Engine stops inserting zmq task into scheduler, err:{err}")
600612
break
@@ -648,7 +660,7 @@ def _insert_zmq_task_to_scheduler(self):
648660
)
649661
# Since the request is not in scheduler
650662
# Send result by zmq directly
651-
self.zmq_server.send_multipart(request_id, [error_result])
663+
self.send_response_server.send_response(request_id, [error_result])
652664
except Exception as e:
653665
llm_logger.error(
654666
f"Error happened while receiving new request from zmq, details={e}, "
@@ -666,7 +678,7 @@ def _zmq_send_generated_tokens(self):
666678
time.sleep(0.005)
667679
continue
668680
for request_id, contents in results.items():
669-
self.zmq_server.send_multipart(request_id, contents)
681+
self.send_response_server.send_response(request_id, contents)
670682

671683
except Exception as e:
672684
llm_logger.error(f"Unexcepted error happened: {e}, {traceback.format_exc()!s}")
@@ -766,5 +778,9 @@ def _exit_sub_services(self):
766778
self.worker_healthy_live_signal.clear()
767779
self.exist_prefill_task_signal.clear()
768780
self.model_weights_status_signal.clear()
769-
if hasattr(self, "zmq_server") and self.zmq_server is not None:
770-
self.zmq_server.close()
781+
if hasattr(self, "send_response_server") and self.send_response_server is not None:
782+
self.send_response_server.close()
783+
if hasattr(self, "recv_request_server") and self.recv_request_server is not None:
784+
self.recv_request_server.close()
785+
if hasattr(self, "recv_control_cmd_server") and self.recv_control_cmd_server is not None:
786+
self.recv_control_cmd_server.close()

fastdeploy/entrypoints/engine_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from fastdeploy.entrypoints.openai.utils import DealerConnectionManager
2828
from fastdeploy.envs import FD_SUPPORT_MAX_CONNECTIONS
2929
from fastdeploy.input.preprocess import InputPreprocessor
30-
from fastdeploy.inter_communicator import IPCSignal, ZmqClient
30+
from fastdeploy.inter_communicator import IPCSignal, ZmqIpcClient
3131
from fastdeploy.metrics.work_metrics import work_process_metrics
3232
from fastdeploy.multimodal.registry import MultimodalRegistry
3333
from fastdeploy.platforms import current_platform
@@ -115,7 +115,7 @@ def create_zmq_client(self, model, mode):
115115
"""
116116
Create a ZMQ client.
117117
"""
118-
self.zmq_client = ZmqClient(model, mode)
118+
self.zmq_client = ZmqIpcClient(model, mode)
119119
self.zmq_client.connect()
120120

121121
async def format_and_add_data(self, prompts: dict):

fastdeploy/envs.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,15 @@
9898
# Whether to use new get_output and save_output method (0 or 1)
9999
"FD_USE_GET_SAVE_OUTPUT_V1": lambda: bool(int(os.getenv("FD_USE_GET_SAVE_OUTPUT_V1", "0"))),
100100
# Whether to enable model cache feature
101+
"FD_ENABLE_MODEL_CACHE": lambda: bool(int(os.getenv("FD_ENABLE_MODEL_CACHE", "0"))),
102+
# enable internal module to access LLMEngine.
103+
"FD_ENABLE_INTERNAL_ADAPTER": lambda: int(os.getenv("FD_ENABLE_INTERNAL_ADAPTER", "0")),
104+
# LLMEngine recieve requests port, used when FD_ENABLE_INTERNAL_ADAPTER=1
105+
"FD_ZMQ_RECV_REQUEST_SERVER_PORT": lambda: os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORT", "8200"),
106+
# LLMEngine send response port, used when FD_ENABLE_INTERNAL_ADAPTER=1
107+
"FD_ZMQ_SEND_RESPONSE_SERVER_PORT": lambda: os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORT", "8201"),
108+
# LLMEngine recieve control command port, used when FD_ENABLE_INTERNAL_ADAPTER=1
109+
"FD_ZMQ_CONTROL_CMD_SERVER_PORTS": lambda: os.getenv("FD_ZMQ_CONTROL_CMD_SERVER_PORTS", "8202"),
101110
"FD_ENABLE_MODEL_LOAD_CACHE": lambda: bool(int(os.getenv("FD_ENABLE_MODEL_LOAD_CACHE", "0"))),
102111
}
103112

fastdeploy/inter_communicator/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@
1717
from .engine_cache_queue import EngineCacheQueue
1818
from .engine_worker_queue import EngineWorkerQueue
1919
from .ipc_signal import IPCSignal, shared_memory_exists
20-
from .zmq_client import ZmqClient
20+
from .zmq_client import ZmqIpcClient
21+
from .zmq_server import ZmqIpcServer, ZmqTcpServer
2122

22-
__all__ = ["ZmqClient", "IPCSignal", "EngineWorkerQueue", "EngineCacheQueue", "shared_memory_exists"]
23+
__all__ = [
24+
"ZmqIpcClient",
25+
"IPCSignal",
26+
"EngineWorkerQueue",
27+
"EngineCacheQueue",
28+
"ZmqTcpServer",
29+
"ZmqIpcServer",
30+
"shared_memory_exists",
31+
]

fastdeploy/inter_communicator/zmq_client.py

Lines changed: 43 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -14,209 +14,100 @@
1414
# limitations under the License.
1515
"""
1616

17-
import os
18-
import threading
19-
import time
20-
import traceback
17+
from abc import ABC, abstractmethod
2118

22-
import msgpack
2319
import zmq
2420

25-
from fastdeploy import envs
26-
from fastdeploy.utils import zmq_client_logger
21+
from fastdeploy.utils import llm_logger
2722

2823

29-
class ZmqClient:
24+
class ZmqClientBase(ABC):
3025
"""
31-
ZmqClient is a class that provides a client-side interface for sending and receiving messages using ZeroMQ.
26+
ZmqClientBase is a base class that provides a client-side interface for sending and receiving messages using ZeroMQ.
3227
"""
3328

34-
def __init__(self, name, mode):
35-
self.context = zmq.Context(4)
36-
self.socket = self.context.socket(mode)
37-
self.file_name = f"/dev/shm/{name}.socket"
38-
self.router_path = f"/dev/shm/router_{name}.ipc"
29+
def __init__(self):
30+
pass
3931

40-
self.ZMQ_SNDHWM = int(envs.FD_ZMQ_SNDHWM)
41-
self.aggregate_send = envs.FD_USE_AGGREGATE_SEND
32+
@abstractmethod
33+
def _create_socket(self):
34+
"""Abstract method to create and return a ZeroMQ socket."""
35+
pass
4236

43-
self.mutex = threading.Lock()
44-
self.req_dict = dict()
45-
self.router = None
46-
self.poller = None
47-
self.running = True
37+
def _ensure_socket(self):
38+
"""Ensure the socket is created before use."""
39+
if self.socket is None:
40+
self.socket = self._create_socket()
4841

42+
@abstractmethod
4943
def connect(self):
5044
"""
5145
Connect to the server using the file name specified in the constructor.
5246
"""
53-
self.socket.connect(f"ipc://{self.file_name}")
54-
55-
def start_server(self):
56-
"""
57-
Start the server using the file name specified in the constructor.
58-
"""
59-
self.socket.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM)
60-
self.socket.setsockopt(zmq.SNDTIMEO, -1)
61-
self.socket.bind(f"ipc://{self.file_name}")
62-
self.poller = zmq.Poller()
63-
self.poller.register(self.socket, zmq.POLLIN)
64-
65-
def create_router(self):
66-
"""
67-
Create a ROUTER socket and bind it to the specified router path.
68-
"""
69-
self.router = self.context.socket(zmq.ROUTER)
70-
self.router.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM)
71-
self.router.setsockopt(zmq.ROUTER_MANDATORY, 1)
72-
self.router.setsockopt(zmq.SNDTIMEO, -1)
73-
self.router.bind(f"ipc://{self.router_path}")
74-
zmq_client_logger.info(f"router path: {self.router_path}")
47+
pass
7548

7649
def send_json(self, data):
7750
"""
7851
Send a JSON-serializable object over the socket.
7952
"""
53+
self._ensure_socket()
8054
self.socket.send_json(data)
8155

8256
def recv_json(self):
8357
"""
8458
Receive a JSON-serializable object from the socket.
8559
"""
60+
self._ensure_socket()
8661
return self.socket.recv_json()
8762

8863
def send_pyobj(self, data):
8964
"""
9065
Send a Pickle-serializable object over the socket.
9166
"""
67+
self._ensure_socket()
9268
self.socket.send_pyobj(data)
9369

9470
def recv_pyobj(self):
9571
"""
9672
Receive a Pickle-serializable object from the socket.
9773
"""
74+
self._ensure_socket()
9875
return self.socket.recv_pyobj()
9976

100-
def pack_aggregated_data(self, data):
101-
"""
102-
Aggregate multiple responses into one and send them to the client.
103-
"""
104-
result = data[0]
105-
if len(data) > 1:
106-
for response in data[1:]:
107-
result.add(response)
108-
result = msgpack.packb([result.to_dict()])
109-
return result
110-
111-
def send_multipart(self, req_id, data):
112-
"""
113-
Send a multipart message to the router socket.
114-
"""
115-
if self.router is None:
116-
raise RuntimeError("Router socket not created. Call create_router() first.")
117-
118-
while self.running:
119-
with self.mutex:
120-
if req_id not in self.req_dict:
121-
try:
122-
client, _, request_id = self.router.recv_multipart(flags=zmq.NOBLOCK)
123-
req_id_str = request_id.decode("utf-8")
124-
self.req_dict[req_id_str] = client
125-
except zmq.Again:
126-
time.sleep(0.001)
127-
continue
128-
else:
129-
break
130-
if self.req_dict[req_id] == -1:
131-
if data[-1].finished:
132-
with self.mutex:
133-
self.req_dict.pop(req_id, None)
134-
return
135-
try:
136-
start_send = time.time()
137-
if self.aggregate_send:
138-
result = self.pack_aggregated_data(data)
139-
else:
140-
result = msgpack.packb([response.to_dict() for response in data])
141-
self.router.send_multipart([self.req_dict[req_id], b"", result])
142-
zmq_client_logger.info(f"send_multipart result: {req_id} len {len(data)} elapse: {time.time()-start_send}")
143-
except zmq.ZMQError as e:
144-
zmq_client_logger.error(f"[{req_id}] zmq error: {e}")
145-
self.req_dict[req_id] = -1
146-
except Exception as e:
147-
zmq_client_logger.error(f"Send result to zmq client failed: {e}, {str(traceback.format_exc())}")
77+
@abstractmethod
78+
def close(self):
79+
pass
14880

149-
if data[-1].finished:
150-
with self.mutex:
151-
self.req_dict.pop(req_id, None)
152-
zmq_client_logger.info(f"send_multipart finished, req_id: {req_id}")
15381

154-
def receive_json_once(self, block=False):
155-
"""
156-
Receive a single message from the socket.
157-
"""
158-
if self.socket is None or self.socket.closed:
159-
return "zmp socket has closed", None
160-
try:
161-
flags = zmq.NOBLOCK if not block else 0
162-
return None, self.socket.recv_json(flags=flags)
163-
except zmq.Again:
164-
return None, None
165-
except Exception as e:
166-
self.close()
167-
zmq_client_logger.warning(f"{e}, {str(traceback.format_exc())}")
168-
return str(e), None
82+
class ZmqIpcClient(ZmqClientBase):
83+
def __init__(self, name, mode):
84+
self.name = name
85+
self.mode = mode
86+
self.file_name = f"/dev/shm/{name}.socket"
87+
self.context = zmq.Context()
88+
self.socket = self.context.socket(self.mode)
16989

170-
def receive_pyobj_once(self, block=False):
171-
"""
172-
Receive a single message from the socket.
173-
"""
174-
if self.socket is None or self.socket.closed:
175-
return "zmp socket has closed", None
176-
try:
177-
flags = zmq.NOBLOCK if not block else 0
178-
return None, self.socket.recv_pyobj(flags=flags)
179-
except zmq.Again:
180-
return None, None
181-
except Exception as e:
182-
self.close()
183-
zmq_client_logger.warning(f"{e}, {str(traceback.format_exc())}")
184-
return str(e), None
90+
def _create_socket(self):
91+
"""create and return a ZeroMQ socket."""
92+
self.context = zmq.Context()
93+
return self.context.socket(self.mode)
18594

186-
def _clear_ipc(self, name):
187-
"""
188-
Remove the IPC file with the given name.
189-
"""
190-
if os.path.exists(name):
191-
try:
192-
os.remove(name)
193-
except OSError as e:
194-
zmq_client_logger.warning(f"Failed to remove IPC file {name} - {e}")
95+
def connect(self):
96+
self._ensure_socket()
97+
self.socket.connect(f"ipc://{self.file_name}")
19598

19699
def close(self):
197100
"""
198-
Close the socket and context, and remove the IPC files.
101+
Close the socket and context.
199102
"""
200-
if not self.running:
201-
return
202-
203-
self.running = False
204-
zmq_client_logger.info("Closing ZMQ connection...")
103+
llm_logger.info("ZMQ client is closing connection...")
205104
try:
206-
if hasattr(self, "socket") and not self.socket.closed:
105+
if self.socket is not None and not self.socket.closed:
106+
self.socket.setsockopt(zmq.LINGER, 0)
207107
self.socket.close()
208-
209-
if self.router is not None and not self.router.closed:
210-
self.router.close()
211-
212-
if not self.context.closed:
108+
if self.context is not None:
213109
self.context.term()
214110

215-
self._clear_ipc(self.file_name)
216-
self._clear_ipc(self.router_path)
217111
except Exception as e:
218-
zmq_client_logger.warning(f"Failed to close ZMQ connection - {e}, {str(traceback.format_exc())}")
112+
llm_logger.warning(f"ZMQ client failed to close connection - {e}")
219113
return
220-
221-
def __exit__(self, exc_type, exc_val, exc_tb):
222-
self.close()

0 commit comments

Comments
 (0)