-
Notifications
You must be signed in to change notification settings - Fork 21
/
main.py
321 lines (302 loc) · 18.8 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
import asyncio
import json
import openai
import plugins
import os
from bridge.context import ContextType
from bridge.reply import Reply, ReplyType
from channel.chat_message import ChatMessage
from channel.wechat.wechat_channel import WechatChannel
from channel.wechatcom.wechatcomapp_channel import WechatComAppChannel
from channel.wechatmp.wechatmp_channel import WechatMPChannel
from config import conf
from plugins import *
from common.log import logger
from plugins.newgpt_turbo.lib import function as fun, get_stock_info as stock, search_google as google
from datetime import datetime
from bridge.bridge import Bridge
def create_channel_object():
channel_type = conf().get("channel_type")
if channel_type in ['wechat', 'wx', 'wxy']:
return WechatChannel()
elif channel_type == 'wechatmp':
return WechatMPChannel()
elif channel_type == 'wechatmp_service':
return WechatMPChannel()
elif channel_type == 'wechatcom_app':
return WechatComAppChannel()
elif channel_type == 'wework':
from channel.wework.wework_channel import WeworkChannel
return WeworkChannel()
elif channel_type == 'weworktop':
from channel.weworktop.weworktop_channel import WeworkChannel
return WeworkChannel()
elif channel_type == 'ntchat':
from channel.wechatnt.ntchat_channel import NtchatChannel
return NtchatChannel()
else:
return WechatChannel()
@plugins.register(name="NewGpt_Turbo", desc="GPT函数调用,极速联网", desire_priority=99, version="0.1",
author="chazzjimel", )
class NewGpt(Plugin):
def __init__(self):
super().__init__()
curdir = os.path.dirname(__file__)
config_path = os.path.join(curdir, "config.json")
functions_path = os.path.join(curdir, "lib", "functions.json")
logger.info(f"[newgpt_turbo] current directory: {curdir}")
logger.info(f"加载配置文件: {config_path}")
if not os.path.exists(config_path):
logger.info('[RP] 配置文件不存在,将使用config.json.template模板')
config_path = os.path.join(curdir, "config.json.template")
logger.info(f"[newgpt_turbo] config template path: {config_path}")
try:
with open(functions_path, 'r', encoding="utf-8") as f:
functions = json.load(f)
self.functions = functions
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
logger.debug(f"[newgpt_turbo] config content: {config}")
self.openai_api_key = config.get("open_ai_api_key")
self.openai_api_base = config.get("open_ai_api_base", "https://api.openai.com/v1")
self.alapi_key = config["alapi_key"]
self.bing_subscription_key = config["bing_subscription_key"]
self.google_api_key = config["google_api_key"]
self.google_cx_id = config["google_cx_id"]
self.functions_openai_model = config["functions_openai_model"]
self.assistant_openai_model = config["assistant_openai_model"]
self.app_key = config["app_key"]
self.app_sign = config["app_sign"]
self.temperature = config.get("temperature", 0.9)
self.max_tokens = config.get("max_tokens", 1000)
self.google_base_url = config.get("google_base_url", "https://www.googleapis.com/customsearch/v1?")
self.comapp = create_channel_object()
self.prompt = config["prompt"]
self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context
logger.info("[newgpt_turbo] inited")
except Exception as e:
if isinstance(e, FileNotFoundError):
logger.warn(f"[RP] init failed, config.json not found.")
else:
logger.warn("[RP] init failed." + str(e))
raise e
def on_handle_context(self, e_context: EventContext):
if e_context["context"].type not in [ContextType.TEXT]:
return
reply = Reply() # 创建一个回复对象
reply.type = ReplyType.TEXT
context = e_context['context'].content[:]
logger.info("newgpt_turbo query=%s" % context)
all_sessions = Bridge().get_bot("chat").sessions
session = all_sessions.session_query(context, e_context["context"]["session_id"], add_to_history=False)
logger.debug("session.messages:%s" % session.messages)
if len(session.messages) > 2:
input_messages = session.messages[-2:]
else:
input_messages = session.messages[-1:]
input_messages.append({"role": "user", "content": context})
logger.debug("input_messages:%s" % input_messages)
conversation_output = self.run_conversation(input_messages, e_context)
if conversation_output is not None:
_reply = conversation_output
logger.debug("conversation_output:%s" % conversation_output)
all_sessions.session_query(context, e_context["context"]["session_id"])
all_sessions.session_reply(_reply, e_context["context"]["session_id"])
reply.content = _reply
e_context["reply"] = reply
e_context.action = EventAction.BREAK_PASS
return
else:
return
def run_conversation(self, input_messages, e_context: EventContext):
global function_response
content = e_context['context'].content[:]
messages = []
logger.debug(f"User input: {input_messages}") # 用户输入
openai.api_key = self.openai_api_key
openai.api_base = self.openai_api_base
response = openai.ChatCompletion.create(
model=self.functions_openai_model,
messages=input_messages,
functions=self.functions,
function_call="auto",
)
message = response["choices"][0]["message"]
# 检查模型是否希望调用函数
if message.get("function_call"):
function_name = message["function_call"]["name"]
logger.debug(f"Function call: {function_name}") # 打印函数调用
logger.debug(f"message={message}")
# 处理各种可能的函数调用,执行函数并获取函数的返回结果
if function_name == "get_weather":
function_args = json.loads(message["function_call"].get("arguments", "{}"))
logger.debug(f"Function arguments: {function_args}") # 打印函数参数
function_response = fun.get_weather(appkey=self.app_key, sign=self.app_sign,
cityNm=function_args.get("cityNm", "未指定地点"))
function_response = json.dumps(function_response, ensure_ascii=False)
logger.debug(f"Function response: {function_response}") # 打印函数响应
elif function_name == "get_morning_news":
function_response = fun.get_morning_news(api_key=self.alapi_key)
logger.debug(f"Function response: {function_response}") # 打印函数响应
elif function_name == "get_hotlist":
function_args_str = message["function_call"].get("arguments", "{}")
function_args = json.loads(function_args_str) # 使用 json.loads 将字符串转换为字典
hotlist_type = function_args.get("type", "未指定类型")
function_response = fun.get_hotlist(api_key=self.alapi_key, type=hotlist_type)
function_response = json.dumps(function_response, ensure_ascii=False)
logger.debug(f"Function response: {function_response}") # 打印函数响应
elif function_name == "search":
function_args_str = message["function_call"].get("arguments", "{}")
function_args = json.loads(function_args_str) # 使用 json.loads 将字符串转换为字典
search_query = function_args.get("query", "未指定关键词")
search_count = function_args.get("count", 1)
if "必应" in content or "newbing" in content.lower():
com_reply = Reply()
com_reply.type = ReplyType.TEXT
context = e_context['context']
if context.kwargs.get('isgroup'):
msg = context.kwargs.get('msg') # 这是WechatMessage实例
nickname = msg.actual_user_nickname # 获取nickname
com_reply.content = "@{name}\n☑️正在给您实时联网必应搜索\n⏳整理深度数据需要时间,请耐心等待...".format(
name=nickname)
else:
com_reply.content = "☑️正在给您实时联网必应搜索\n⏳整理深度数据需要时间,请耐心等待..."
if self.comapp is not None:
self.comapp.send(com_reply, e_context['context'])
function_response = fun.search_bing(subscription_key=self.bing_subscription_key, query=search_query,
count=int(search_count))
function_response = json.dumps(function_response, ensure_ascii=False)
logger.debug(f"Function response: {function_response}") # 打印函数响应
elif "谷歌" in content or "搜索" in content or "google" in content.lower():
com_reply = Reply()
com_reply.type = ReplyType.TEXT
context = e_context['context']
if context.kwargs.get('isgroup'):
msg = context.kwargs.get('msg') # 这是WechatMessage实例
nickname = msg.actual_user_nickname # 获取nickname
com_reply.content = "@{name}\n☑️正在给您实时联网谷歌搜索\n⏳整理深度数据需要几分钟,请您耐心等待...".format(
name=nickname)
else:
com_reply.content = "☑️正在给您实时联网谷歌搜索\n⏳整理深度数据需要几分钟,请您耐心等待..."
if self.comapp is not None:
self.comapp.send(com_reply, e_context['context'])
function_response = google.search_google(search_terms=search_query, base_url=self.google_base_url,
iterations=1, count=1,
api_key=self.google_api_key, cx_id=self.google_cx_id,
model=self.assistant_openai_model)
logger.debug(f"google.search_google url: {self.google_base_url}")
function_response = json.dumps(function_response, ensure_ascii=False)
logger.debug(f"Function response: {function_response}") # 打印函数响应
else:
return None
elif function_name == "get_oil_price":
function_response = fun.get_oil_price(api_key=self.alapi_key)
logger.debug(f"Function response: {function_response}") # 打印函数响应
elif function_name == "get_Constellation_analysis":
function_args = json.loads(message["function_call"].get("arguments", "{}"))
logger.debug(f"Function arguments: {function_args}") # 打印函数参数
function_response = fun.get_Constellation_analysis(api_key=self.alapi_key,
star=function_args.get("star", "未指定星座"),
)
function_response = json.dumps(function_response, ensure_ascii=False)
logger.debug(f"Function response: {function_response}") # 打印函数响应
elif function_name == "music_search":
function_args = json.loads(message["function_call"].get("arguments", "{}"))
logger.debug(f"Function arguments: {function_args}") # 打印函数参数
function_response = fun.music_search(api_key=self.alapi_key,
keyword=function_args.get("keyword", "未指定音乐"),
)
function_response = json.dumps(function_response, ensure_ascii=False)
logger.debug(f"Function response: {function_response}") # 打印函数响应
elif function_name == "get_datetime":
function_args = json.loads(message["function_call"].get("arguments", "{}"))
logger.debug(f"Function arguments: {function_args}") # 打印函数参数
city = function_args.get("city_en", "未指定城市") # 如果没有指定城市,将默认查询北京
function_response = fun.get_datetime(appkey=self.app_key, sign=self.app_sign, city_en=city)
function_response = json.dumps(function_response, ensure_ascii=False)
logger.debug(f"Function response: {function_response}") # 打印函数响应
elif function_name == "get_url":
function_args = json.loads(message["function_call"].get("arguments", "{}"))
logger.debug(f"Function arguments: {function_args}") # 打印函数参数
url = function_args.get("url", "未指定URL")
function_response = fun.get_url(url=url)
function_response = json.dumps(function_response, ensure_ascii=False)
logger.debug(f"Function response: {function_response}") # 打印函数响应
elif function_name == "get_stock_info":
function_args = json.loads(message["function_call"].get("arguments", "{}"))
logger.debug(f"Function arguments: {function_args}") # 打印函数参数
stock_names = function_args.get("stock_names", "未指定股票信息")
function_response = stock.get_stock_info(stock_names=stock_names, appkey=self.app_key,
sign=self.app_sign)
function_response = json.dumps(function_response, ensure_ascii=False)
logger.debug(f"Function response: {function_response}") # 打印函数响应
elif function_name == "get_video_url":
function_args = json.loads(message["function_call"].get("arguments", "{}"))
logger.debug(f"Function arguments: {function_args}") # 打印函数参数
url = function_args.get("url", "无URL")
viedo_url = fun.get_video_url(api_key=self.alapi_key, target_url=url)
if viedo_url:
logger.debug(f"viedo_url: {viedo_url}")
reply = Reply() # 创建一个回复对象
reply.type = ReplyType.VIDEO_URL
reply.content = viedo_url
e_context["reply"] = reply
e_context.action = EventAction.BREAK_PASS
return
else:
reply = Reply() # 创建一个回复对象
reply.type = ReplyType.TEXT
reply.content = "抱歉,解析失败了·······"
e_context["reply"] = reply
e_context.action = EventAction.BREAK_PASS
return
elif function_name == "search_bing_news":
function_args = json.loads(message["function_call"].get("arguments", "{}"))
logger.debug(f"Function arguments: {function_args}") # 打印函数参数
search_query = function_args.get("query", "未指定关键词")
search_count = function_args.get("count", 10)
function_response = fun.search_bing_news(count=search_count,
subscription_key=self.bing_subscription_key,
query=search_query, )
function_response = json.dumps(function_response, ensure_ascii=False)
logger.debug(f"Function response: {function_response}") # 打印函数响应
else:
return
msg: ChatMessage = e_context["context"]["msg"]
current_date = datetime.now().strftime("%Y年%m月%d日%H时%M分")
if e_context["context"]["isgroup"]:
prompt = self.prompt.format(time=current_date, bot_name=msg.to_user_nickname,
name=msg.actual_user_nickname, content=content,
function_response=function_response)
else:
prompt = self.prompt.format(time=current_date, bot_name=msg.to_user_nickname,
name=msg.from_user_nickname, content=content,
function_response=function_response)
# 将函数的返回结果发送给第二个模型
logger.debug(f"prompt :" + prompt)
logger.debug("messages: %s", [{"role": "system", "content": prompt}])
second_response = openai.ChatCompletion.create(
model=self.assistant_openai_model,
messages=[
{"role": "system", "content": prompt},
],
temperature=float(self.temperature),
max_tokens=int(self.max_tokens)
)
logger.debug(f"Second response: {second_response['choices'][0]['message']['content']}") # 打印第二次的响应
messages.append(second_response["choices"][0]["message"])
return second_response['choices'][0]['message']['content']
else:
# 如果模型不希望调用函数,直接打印其响应
logger.info("模型响应无函数调用,跳过处理") # 打印模型的响应
return
def get_help_text(self, verbose=False, **kwargs):
# 初始化帮助文本,说明利用 midjourney api 来画图
help_text = "\n🔥GPT函数调用,极速联网,语境如需联网且有功能支持,则会直接联网获取实时信息\n"
# 如果不需要详细说明,则直接返回帮助文本
if not verbose:
return help_text
# 否则,添加详细的使用方法到帮助文本中
help_text = "newgpt_turbo,极速联网无需特殊指令,前置识别\n🔎谷歌搜索、🔎新闻搜索\n🗞每日早报、☀全球天气\n⌚实时时间、⛽全国油价\n🌌星座运势、🎵音乐(网易云)\n🔥各类热榜信息、📹短视频解析等"
# 返回帮助文本
return help_text