-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbot.py
311 lines (250 loc) · 11.9 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# bot.py
import asyncio
import logging
import time
from fastapi import FastAPI, Request # type: ignore
from plugin_manager import PluginManager
import json
import websockets
import os
import importlib.util # 用于动态加载模块
import inspect # 用于检查模块内的函数
# 配置日志
logger = logging.getLogger("DreamSu")
class Bot:
def __init__(self):
# 确保关键目录存在
os.makedirs("cache", exist_ok=True)
os.makedirs("logs", exist_ok=True)
os.makedirs("plugins", exist_ok=True)
# 动态加载 API 方法
# self.load_api_methods('api') 禁止这样做
self.load_api_methods('api/bot')
# 导入配置文件
config = self.load_config("config/config.json")
dreamsu = self.load_config("config/DreamSu.json")
self.bot_version = dreamsu.get("bot_version")
# 连接配置
self.rtmsg_type = config.get("rtmsg_type", "http") # 默认为 HTTP
self.rq_type = config.get("rq_type", "http") # 默认为 HTTP
self.rt_http_port = config.get("rt_http_port", 18080) # HTTP 上报消息接收端口
self.rq_http_url = config.get("rq_http_url") # HTTP api 请求地址
self.r_ws_port = config.get("r_ws_port", 18081) # 反向 ws 端口
self.f_ws_url = config.get("f_ws_url") # 正向 ws 连接地址
# 临时兼容旧的连接配置
self.base_url = config.get("rq_http_url")
# 口令
self.token = config.get("token")
# 初始化好友列表和群列表
self.friend_list = [] # 存储好友列表
self.group_list = [] # 存储群列表
# 实例化插件管理器
self.plugin_manager = PluginManager(self)
self.pm_status = 1 # 插件管理状态
self.pm_list = {} # 插件列表
# 读取配置文件中的主人ID
self.master_ids = config.get('masters', [])
# 机器人基本信息
self.bot_info = {}
self.bot_id = None
self.bot_nickname = None
self.get_cookies = {}
# 初始化FastAPI
self.app = FastAPI()
self.dv = 0
# 增加消息队列
self.message_queue = asyncio.Queue()
self.create_routes()
async def start(self):
logger.info("DreamSuOB启动中...")
logger.info(f"当前版本: {self.bot_version}")
logger.info(f"主人账号: {self.master_ids}")
logger.info("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-||")
from api.get import get_login_info, get_cookies
logger.info("正在获取bot账号信息...")
self.bot_info = get_login_info(self.base_url, self.token)
self.bot_id = self.bot_info.get('user_id')
self.bot_nickname = self.bot_info.get('nickname')
logger.info(f"账号: {self.bot_id} ")
logger.info(f"昵称: {self.bot_nickname} ")
if not self.bot_info or 'user_id' not in self.bot_info or 'nickname' not in self.bot_info:
logger.warning("获取 bot 账号信息失败或数据缺失!")
self.dv += 1
self.get_cookies_url = "act.qzone.qq.com"
self.get_cookies = get_cookies(self.base_url, self.get_cookies_url, self.token)
logger.debug(f"获取 bot 在 {self.get_cookies_url} 的 cookie : \n{self.get_cookies} \n")
if not self.get_cookies:
logger.warning("获取 cookies 失败或数据为空!")
self.dv += 1
try:
# 更新好友列表和群列表
logger.info("正在加载好友列表...")
new_friends, removed_friends, len_friends = await self.update_friend_list()
logger.info(f"加载成功 {len_friends} 个好友\n")
except TypeError as e:
logger.warning("加载好友列表时发生错误: %s", str(e))
self.dv += 3
try:
logger.info("正在加载群列表...")
new_groups, removed_groups, len_groups = await self.update_group_list()
logger.info(f"加载成功 {len_groups} 个群聊")
except TypeError as e:
logger.warning("加载群列表时发生错误: %s", str(e))
self.dv += 6
if self.dv == 0:
pass
elif self.dv < 3:
logger.warning("Bot账号数据残缺")
elif self.dv >= 3:
logger.warning(" ")
logger.warning("Bot账号数据严重残缺")
logger.warning("框架尝试继续运行,不保证稳定性。随时可能崩溃。")
logger.warning(" ")
# 启动异步更新任务
asyncio.create_task(self.schedule_updates())
logger.info("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-||\n")
await self.plugin_manager.load_plugins('plugins', 'plugins/example') # 加载插件
async def main():
if self.rtmsg_type == "ws":
task1 = asyncio.create_task(self.websocket_server()) # 启动 WebSocket 服务器
else:
task1 = asyncio.create_task(self.http_server()) # 启动 HTTP 服务器
if self.rq_type == "ws":
task2 = asyncio.create_task(self.reverse_websocket_server()) # 启动反向 WebSocket 服务器
else:
task2 = None
tasks = [task1]
if task2 is not None:
tasks.append(task2)
await asyncio.gather(*tasks) # 并发运行所有任务
await main() # 启动主事件循环
def load_api_methods(self, directory):
"""动态加载指定目录下的所有.py文件中的函数,并将其添加到当前类的实例"""
logger.info("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-||")
if directory == "api":
logger.warning("请不要这样做!")
return
else:
logger.info("正在加载框架内部方法...")
logger.debug(" ")
for filename in os.listdir(directory):
if filename.endswith('.py'):
module_name = filename[:-3] # 去掉".py"后缀
file_path = os.path.join(directory, filename)
try:
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
except Exception as e:
logger.error(f"加载模块 {module_name} 失败: {e}")
continue
# 获取模块中的所有函数并添加到当前类的实例中
for name, func in inspect.getmembers(module, inspect.isfunction):
setattr(self, name, func.__get__(self))
# 添加Debug日志来输出每个成功加载的方法名
logger.debug(f"成功加载方法: {name} 来自模块: {module_name}")
if directory == "api":
logger.warning("雪豹闭嘴")
else:
logger.info("框架内部方法加载完毕")
logger.info("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-||")
async def http_server(self):
# 启动 HTTP 消息接收服务器
logger.info("消息接收服务器启动中...")
import uvicorn # type: ignore
config = uvicorn.Config(
self.app,
host="0.0.0.0",
port=self.rt_http_port,
workers=4,
log_level="critical" # 仅记录严重错误
)
server = uvicorn.Server(config)
logger.info("HTTP 消息接收服务器已成功启动。\n")
await server.serve()
async def websocket_server(self):
# 正向 WebSocket 消息接收服务器
logger.info("消息接收服务器启动中...")
logger.info(f"尝试连接到 WebSocket 地址: {self.f_ws_url}")
try:
async with websockets.connect(self.f_ws_url, extra_headers={"Authorization": f"Bearer {self.token}"}) as websocket:
logger.info(f"\n\n成功连接到OneBot WebSocket 地址: {self.f_ws_url}\n\n开始接收消息\n")
while True:
message = await websocket.recv() # 接收消息
logger.debug(f"收到 WebSocket 消息原文: {message}")
try:
# 将消息字符串转换为 JSON 数据
data = json.loads(message)
# 格式化消息并输出到日志
extracted_info = self.extract_message_info(data)
if data.get('post_type') == 'meta_event':
logger.debug("[WS生命状态]%s", extracted_info)
else:
logger.info("[WSmsg]%s ", extracted_info)
# 处理消息
await self.handle_message(data)
except json.JSONDecodeError:
logger.error("无法解析 WebSocket 消息的 JSON 数据")
except Exception as e:
logger.error(f"处理 WebSocket 消息时出错: {e}")
except Exception as e:
logger.error(f"WebSocket 连接失败: {e} \n\n5秒\n后重连")
await asyncio.sleep(5) # 等待5秒后重试
await self.websocket_server() # 重试连接
def create_routes(self):
# HTTP 接收消息后路由
@self.app.post("/")
async def root(request: Request):
data = await request.json()
extracted_info = self.extract_message_info(data)
logger.info("[HTTPmsg]%s ", extracted_info)
try:
await self.handle_message(data)
except json.JSONDecodeError:
logger.error("无法解析 http 消息的 JSON 数据")
except Exception as e:
logger.error(f"处理 http 消息时出错: {e}")
return {}
async def handle_message(self, message):
# 分发消息给插件管理器
if 'raw_message' not in message:
if 'meta_event_type' in message:
logger.debug("心跳事件")
return
if message.get('post_type') == 'notice':
if 'recall' in message.get('notice_type', ''):
logger.debug("消息被撤回")
else:
logger.warning("消息丢失,可能被撤回")
return
semaphore = asyncio.Semaphore(200) # 限制并发处理任务数量为200
try:
await self.plugin_manager.dispatch_message(message, semaphore)
except KeyError as e:
if str(e) == "'raw_message'":
logger.debug("收到的消息中缺少 'raw_message' 字段")
else:
logger.error(f"某个插件在处理消息时出错: {e}")
except Exception as e:
if str(e) == "'Bot' object has no attribute 'bot'":
logger.error(f"某个插件错误的传递或访问 Bot 实例: {e}")
else:
logger.error(f"某个插件在处理消息时出错: {e}")
async def schedule_updates(self):
# 异步定时任务
async def update_friends():
while True:
try:
await self.update_friend_list()
except Exception as e:
logger.warning("更新好友列表时发生错误: %s", str(e))
await asyncio.sleep(60) # 每1分钟更新好友列表
async def update_groups():
while True:
try:
await self.update_group_list()
except Exception as e:
logger.warning("更新群列表时发生错误: %s", str(e))
await asyncio.sleep(300) # 每5分钟更新群列表
# 启动并行任务
await asyncio.gather(update_friends(), update_groups())