Skip to content

Commit 993fb82

Browse files
committed
Initial revision of the LLVMActor.
1 parent c703518 commit 993fb82

File tree

5 files changed

+337
-0
lines changed

5 files changed

+337
-0
lines changed

Cluster/InfernLLMActor.py

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from typing import Dict
2+
from uuid import UUID
3+
from queue import Queue
4+
5+
import ray
6+
7+
from Cluster.InfernLLMWorker import InfernLLMWorker
8+
from Cluster.LLMSession import LLMSession, LLMRequest, LLMInferRequest
9+
10+
@ray.remote(num_gpus=1.0, resources={"llm": 1})
11+
class InfernLLMActor():
12+
debug = True
13+
sessions: Dict[UUID, LLMSession]
14+
LLM: InfernLLMWorker
15+
16+
def __init__(self):
17+
super().__init__()
18+
self.sessions = {}
19+
20+
def start(self):
21+
for device in ('xpu', 'cuda', 'cpu'):
22+
try:
23+
self.llm = InfernLLMWorker(device)
24+
except (ValueError, RuntimeError):
25+
continue
26+
break
27+
else:
28+
raise RuntimeError('Failed to initialize LLM')
29+
self.llm.start()
30+
tq = Queue()
31+
def res_cb(result): tq.put(result)
32+
irs = tuple(LLMInferRequest(LLMRequest('What is your name?', None), [{}])
33+
for _ in range(self.llm.max_batch_size))
34+
for _i in irs: _i.textout_cb = res_cb
35+
with self.llm.inf_queue.mutex:
36+
for ir in irs:
37+
self.llm.inf_queue.queue.append(ir)
38+
self.llm.inf_queue.not_empty.notify()
39+
for _ in irs:
40+
tq.get()
41+
42+
def stop(self):
43+
self.llm.stop()
44+
45+
def new_llm_session(self):
46+
if self.debug: print('InfernLLMActor.new_llm_session')
47+
sess = LLMSession(self.llm)
48+
self.sessions[sess.id] = sess
49+
return sess.id
50+
51+
def llm_session_end(self, sess_id):
52+
if self.debug: print('InfernLLMActor.llm_session_end')
53+
sess = self.sessions[sess_id]
54+
sess.stop()
55+
del self.sessions[sess_id]
56+
57+
def llm_session_textin(self, sess_id, req:LLMRequest):
58+
if self.debug: print('InfernLLMActor.llm_session_textin')
59+
sess = self.sessions[sess_id]
60+
sess.textin(req)
61+
return sess_id
62+
63+
def llm_session_context_add(self, sess_id, content:str, role:str = 'user'):
64+
if self.debug: print('InfernLLMActor.llm_session_context_add')
65+
sess = self.sessions[sess_id]
66+
sess.context_add(content, role)
67+
return sess_id

Cluster/InfernLLMWorker.py

+119
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
from typing import Tuple, List, Iterator
2+
from os.path import exists as path_exists
3+
from itertools import chain
4+
from functools import partial
5+
6+
import torch
7+
import torch.nn.functional as F
8+
9+
from transformers import TextStreamer
10+
11+
from Cluster.InfernBatchedWorker import InfernBatchedWorker
12+
from Cluster.InfernTTSWorker import get_torch_hw
13+
from Cluster.LLMSession import LLMResult, LLMInferRequest
14+
15+
class ResultsStreamer(TextStreamer):
16+
debug = False
17+
sync_on = ('. ', '? ', '! ', '\n')
18+
decode_batch_size = 8
19+
def __init__(self, wis:List[LLMInferRequest], upper:'InfernLLMWorker'):
20+
super().__init__(tokenizer=upper.llm_tokenizer)
21+
self.wi_cbs = tuple(wi.textout_cb for wi in wis)
22+
self.newLLMResult = tuple(partial(LLMResult, req_id=wi.req.id) for wi in wis)
23+
batch_size = len(wis)
24+
self.oposs = [0 for _ in range(batch_size)]
25+
self.current_tokens = None
26+
self.batch_decode = partial(upper.llm_tokenizer.batch_decode, skip_special_tokens=True)
27+
28+
def put(self, token_ids):
29+
if self.current_tokens is None:
30+
self.current_tokens = torch.zeros((token_ids.shape[0], 0), dtype=torch.long)
31+
return
32+
if token_ids.dim() == 1: # Shape [batch_size]
33+
token_ids = token_ids.unsqueeze(1)
34+
self.current_tokens = torch.cat([self.current_tokens, token_ids], dim=1)
35+
if self.current_tokens.shape[1] % self.decode_batch_size == 0:
36+
return
37+
results = self.batch_decode(self.current_tokens)
38+
for (ir, r), op, cb, newLR in zip(enumerate(results), self.oposs, self.wi_cbs, self.newLLMResult):
39+
new_content = r[op:]
40+
if len(new_content) == 0: continue
41+
sp = (op + pos + len(c) for c in self.sync_on if (pos:=new_content.rfind(c)) >= 0)
42+
try:
43+
spos = next(sp)
44+
except StopIteration:
45+
continue
46+
r = r[op:spos-1]
47+
if len(r) < 10: continue
48+
cb(result=newLR(r))
49+
self.oposs[ir] = spos
50+
if self.debug:
51+
print(f'{self.oposs=} {self.current_tokens.shape=}')
52+
53+
def end(self):
54+
if self.debug:
55+
print(f'finished: {self.current_tokens.shape=}')
56+
results = self.batch_decode(self.current_tokens)
57+
for r, op, cb, newLR in zip(results, self.oposs, self.wi_cbs, self.newLLMResult):
58+
if len(r) == op: continue
59+
cb(result=newLR(r[op:]))
60+
del self.current_tokens
61+
del self.wi_cbs
62+
63+
class InfernLLMWorker(InfernBatchedWorker):
64+
model_name = "Qwen/Qwen2.5-14B-Instruct"
65+
model_cache_dir = f"/tmp/saved_model.{model_name}"
66+
max_batch_size: int = 8
67+
debug = True
68+
llm_model: object
69+
llm_tokenizer: object
70+
output_sr: int
71+
72+
def __init__(self, device=None):
73+
from warnings import filterwarnings
74+
filterwarnings("ignore", category=FutureWarning)
75+
filterwarnings("ignore", category=UserWarning)
76+
from transformers import AutoTokenizer
77+
from ipex_llm.transformers import AutoModelForCausalLM
78+
super().__init__()
79+
if device is None:
80+
device = get_torch_hw()
81+
def load_model(mn):
82+
m = AutoModelForCausalLM.from_pretrained(mn, torch_dtype="auto",
83+
device_map="auto",
84+
optimize_model=True,
85+
trust_remote_code=True,
86+
load_in_4bit=True,
87+
use_cache=True
88+
)
89+
if mn != self.model_cache_dir:
90+
m.save_low_bit(self.model_cache_dir)
91+
return m.to(device)
92+
if path_exists(self.model_cache_dir):
93+
try:
94+
model = AutoModelForCausalLM.load_low_bit(self.model_cache_dir,
95+
trust_remote_code=True)
96+
except Exception:
97+
model = load_model(self.model_name)
98+
else:
99+
model = load_model(self.model_name)
100+
self.llm_model = model.to(device)
101+
self.llm_tokenizer = AutoTokenizer.from_pretrained(self.model_name)
102+
103+
def process_batch(self, wis:List[LLMInferRequest]):
104+
if self.debug:
105+
print(f'InfernLLMWorker.process_batch: got {len(wis)=}')
106+
streamer = ResultsStreamer(wis, self)
107+
with torch.no_grad():
108+
messages = [self.llm_tokenizer.apply_chat_template(list(r.context), tokenize=False,
109+
add_generation_prompt=True)
110+
for r in wis]
111+
model_inputs = self.llm_tokenizer(messages, return_tensors="pt", padding=True).to(self.llm_model.device)
112+
self.llm_model.generate(
113+
**model_inputs,
114+
max_new_tokens=16 * 1024,
115+
output_scores=True,
116+
return_dict_in_generate=True,
117+
streamer=streamer,
118+
)
119+
torch.xpu.synchronize()

Cluster/LLMSession.py

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from typing import List, Tuple, Optional
2+
from time import monotonic
3+
from functools import partial
4+
from uuid import uuid4, UUID
5+
6+
class LLMRequest():
7+
id: UUID
8+
text: str
9+
textout_cb: callable
10+
auto_ctx_add: bool = True
11+
def __init__(self, text:str, textout_cb:callable):
12+
self.text, self.textout_cb = text, textout_cb
13+
self.id = uuid4()
14+
15+
class LLMResult():
16+
req_id: UUID
17+
text: str
18+
def __init__(self, text:str, req_id:UUID):
19+
self.text, self.req_id = text, req_id
20+
21+
class LLMInferRequest():
22+
req: LLMRequest
23+
context: Tuple[dict]
24+
textout_cb: callable
25+
26+
def __init__(self, req:LLMRequest, context:List[dict]):
27+
self.req, self.context = req, tuple(context)
28+
29+
class LLMSession():
30+
id: UUID
31+
context: List[dict]
32+
debug: bool = False
33+
def __init__(self, llm):
34+
self.id = uuid4()
35+
self.context = [{"role": "system", "content": "You are Qwen, created by Alibaba Cloud. " +
36+
"You are a helpful voice auto-attendant for the company Sippy Software. " +
37+
"Start by greeting the caller and asking how you can help. " +
38+
"Keep your messages brief and concise to reduce latency." +
39+
"The model output is fed into the dumb TTS system for audio output: DO not add any extended formatting."}]
40+
self.llm = llm
41+
42+
def context_add(self, content:str, role:str = "user"):
43+
if self.debug:
44+
print(f'{monotonic():4.3f}: LLMSession.context_add: {self.context=}, {content=}')
45+
if len(self.context) > 0 and self.context[-1]["role"] == role:
46+
self.context[-1]["content"] += f' {content}'
47+
else:
48+
self.context.append({"role": role, "content": content})
49+
50+
def textin(self, req:LLMRequest):
51+
if self.debug:
52+
print(f'{monotonic():4.3f}: LLMSession.textin: ${req.text=}, {req.textout_cb=} {self.context=}')
53+
self.context_add(req.text)
54+
ireq = LLMInferRequest(req, self.context)
55+
if hasattr(req, '_proc_start_cb'):
56+
ireq._proc_start_cb = req._proc_start_cb
57+
ireq.textout_cb = partial(self.textout, req = req)
58+
self.llm.infer(ireq)
59+
60+
def textout(self, req:LLMRequest, result:LLMResult):
61+
if self.debug:
62+
print(f'{monotonic():4.3f}: LLMSession.textout: {result.text=}')
63+
if req.auto_ctx_add:
64+
self.context_add(result.text, "assistant")
65+
req.textout_cb(result = result)
66+
67+
def stop(self):
68+
if self.debug: print('STTSession.stop')
69+
del self.llm

examples/llm_test.py

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import ray
2+
from sys import stderr
3+
from time import monotonic
4+
from uuid import UUID
5+
from functools import partial
6+
from time import sleep
7+
from Cluster.InfernLLMActor import InfernLLMActor
8+
from Cluster.LLMSession import LLMRequest
9+
10+
#@ray.remote(resources={"head": 1})
11+
#class text_in(result):
12+
13+
class TimedLLMRequest(LLMRequest):
14+
queue_ts: float
15+
proc_start_ts: float
16+
def __init__(self, text:str, lms:UUID, lma:InfernLLMActor):
17+
tin = partial(self.text_in, lms=lms, lma=lma)
18+
super().__init__(text, tin)
19+
self.queue_ts = monotonic()
20+
21+
def _proc_start_cb(self):
22+
self.proc_start_ts = monotonic()
23+
24+
def text_in(self, result:str, lms:UUID, lma:InfernLLMActor):
25+
from sys import stderr as _stderr
26+
itime = monotonic() - self.proc_start_ts
27+
print(f'text_in: got {result=}, inference time: {itime}', file=_stderr)
28+
req = TimedLLMRequest('Hello, can I speak to the CEO?', lms, lma)
29+
lma.llm_session_textin.remote(lms, req)
30+
31+
32+
ray.init(num_gpus=2, resources = {'llm':1,'head':1})
33+
34+
print('Initializing InfernLLMActor...', file=stderr)
35+
llm_actor = InfernLLMActor.remote()
36+
ray.get(llm_actor.start.remote())
37+
print('InfernLLMActor is ready', file=stderr)
38+
39+
40+
flms = [llm_actor.new_llm_session.remote() for _ in range(100)]
41+
print(f'Created {len(flms)} sessions', file=stderr)
42+
def sess(lms):
43+
req = TimedLLMRequest('<Incoming call from "Doe Joe" +11233742223>', lms, llm_actor)
44+
return llm_actor.llm_session_textin.remote(lms, req)
45+
futs = [sess(lms) for lms in flms]
46+
for f in futs:
47+
ray.get(f)
48+
sleep(3600)

examples/voice_ass.py

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import torch
2+
from transformers import AutoTokenizer, AutoConfig
3+
from ipex_llm.transformers import AutoModelForCausalLM
4+
from datetime import datetime
5+
6+
model_name = "Qwen/Qwen2.5-Coder-14B-Instruct"
7+
#config = AutoConfig.from_pretrained(model_name, trust_remote_code=True)
8+
#local_cache = f"~/.cache/Infernos/{model_name}"
9+
#config.save_pretrained(local_cache)
10+
11+
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype="auto",
12+
device_map="auto",
13+
load_in_4bit=True,
14+
optimize_model=True,
15+
trust_remote_code=True,
16+
use_cache=True
17+
)
18+
#model = model.half().to("xpu")
19+
model = model.to("xpu")
20+
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-Coder-14B-Instruct")
21+
messages = [{"role": "system", "content": "You are Qwen, created by Alibaba Cloud. You are a helpful voice auto-attendant for the company Sippy Software. Start by greeting the caller and asking how you can help. Try to keep your messages brief and concise to reduce latency."}, {"role": "system", "content": f'<Now is {datetime.now()}> <Incoming call from "Doe Joe" +11233742223>'}]
22+
text = tokenizer.apply_chat_template(messages,
23+
tokenize=False,
24+
add_generation_prompt=True
25+
)
26+
for i in range(10):
27+
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
28+
generated_ids = model.generate(**model_inputs, max_new_tokens=16 * 1024, output_scores=True, return_dict_in_generate=True)
29+
torch.xpu.synchronize()
30+
generated_ids = [
31+
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids.sequences)
32+
]
33+
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
34+
print(messages, response)

0 commit comments

Comments
 (0)