-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbnb质押播报
416 lines (342 loc) · 16.4 KB
/
bnb质押播报
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
import time
from datetime import datetime
from web3 import Web3
import random
import asyncio
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters, CallbackQueryHandler
import logging
from telegram.ext import JobQueue
from telegram.constants import ChatType
import json
import os
# 设置日志
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
# 交易监控范围(ETH)
MIN_ETH_VALUE = 1
MAX_ETH_VALUE = 100
# 区块间隔范围
MIN_BLOCK_INTERVAL = 1
MAX_BLOCK_INTERVAL = 15
# Telegram Bot Token
TELEGRAM_TOKEN = "7798725461:AAG6IUL9Mb4bxMBpQMbHjT_Gjn1u5HzIf7I"
# 连接到以太坊网络
INFURA_URL = "https://mainnet.infura.io/v3/5566118678b6401ab099e0deac9af291"
w3 = Web3(Web3.HTTPProvider(INFURA_URL))
# 存储活跃的群组ID和它们的监控状态
active_groups = {}
# 添加按钮配置存储
button_configs = {}
# 添加常量定义在文件开头
ADMIN_IDS = [8025957025, 6279591488, 5868384843]
def parse_button_config(text):
"""解析按钮配置文本"""
buttons = []
rows = text.strip().split('\n')
for row in rows:
button_row = []
# 分割同一行的多个按钮
row_buttons = row.split('|')
for button in row_buttons:
button = button.strip()
if not button:
continue
# 分割按钮名称和数据
parts = button.split('-', 1)
if len(parts) != 2:
continue
name = parts[0].strip()
data = parts[1].strip()
# 检查是否是链接按钮
if data.startswith(('http://', 'https://', 'tg://')):
button_row.append(InlineKeyboardButton(text=name, url=data))
else:
button_row.append(InlineKeyboardButton(text=name, callback_data=data))
if button_row:
buttons.append(button_row)
return buttons
def save_button_config(chat_id, config):
"""保存按钮配置"""
button_configs[chat_id] = config
# 可以选择将配置保存到文件中以持久化
try:
with open('button_configs.json', 'w', encoding='utf-8') as f:
json.dump(button_configs, f, ensure_ascii=False)
except Exception as e:
logging.error(f"保存按钮配置出错: {str(e)}")
def load_button_configs():
"""加载按钮配置"""
global button_configs
try:
if os.path.exists('button_configs.json'):
with open('button_configs.json', 'r', encoding='utf-8') as f:
button_configs = json.load(f)
except Exception as e:
logging.error(f"加载按钮配置出错: {str(e)}")
class GroupMonitor:
def __init__(self, chat_id):
self.chat_id = chat_id
self.last_notification_block = None
self.next_block_interval = None
self.processed_transactions = set()
async def process_block_transactions(self, block, context: ContextTypes.DEFAULT_TYPE):
"""处理区块中的交易并发送到Telegram"""
try:
# 筛选符合金额条件的交易
valid_transactions = [
tx for tx in block.transactions
if MIN_ETH_VALUE <= float(w3.from_wei(tx.value, 'ether')) <= MAX_ETH_VALUE
and tx.hash.hex() not in self.processed_transactions
]
has_transactions = False
# 处理所有符合条件的新交易
for tx in valid_transactions:
has_transactions = True
self.processed_transactions.add(tx.hash.hex())
# 计算ETH数量
value_eth = w3.from_wei(tx.value, 'ether')
# 格式化时间
timestamp = datetime.fromtimestamp(block.timestamp)
# 获取合约地址
contract_address = tx.to if tx.to else "合约创建"
report = f"""区块高度:{block.number}
时间:{timestamp.strftime('%Y-%m-%d %H:%M:%S')}
来源:{tx['from']}
目的:币安质押矿池(尾号***000000)
数量:{value_eth:.2f} ETH(以太坊)
合约地址:
<code>{contract_address}</code>(点击复制)"""
# 直接使用ADMIN_ID的按钮配置
admin_config = button_configs.get(str(ADMIN_IDS[0]))
if admin_config:
buttons = parse_button_config(admin_config)
reply_markup = InlineKeyboardMarkup(buttons)
await context.bot.send_message(
chat_id=self.chat_id,
text=report,
parse_mode='html',
reply_markup=reply_markup
)
else:
await context.bot.send_message(
chat_id=self.chat_id,
text=report,
parse_mode='html'
)
# 清理旧的交易记录
if len(self.processed_transactions) > 1000:
self.processed_transactions.clear()
return has_transactions
except Exception as e:
logging.error(f"处理交易时出错: {str(e)}")
return False
async def monitor_block(self, context: ContextTypes.DEFAULT_TYPE):
"""监控区块并发送更新"""
try:
latest_block = w3.eth.get_block('latest', full_transactions=True)
logging.info(f"获取到最新区块: {latest_block.number}")
# 首次运行
if self.last_notification_block is None:
logging.info(f"群组 {self.chat_id} 首次运行监控")
await self.process_block_transactions(latest_block, context)
self.last_notification_block = latest_block.number
self.next_block_interval = random.randint(MIN_BLOCK_INTERVAL, MAX_BLOCK_INTERVAL)
return
# 检查是否达到通知间隔
blocks_passed = latest_block.number - self.last_notification_block
logging.info(f"群组 {self.chat_id} - 已经过去 {blocks_passed} 个区块,下一次通知需要 {self.next_block_interval} 个区块")
if blocks_passed < self.next_block_interval:
return
# 发送新的交易信息
logging.info(f"群组 {self.chat_id} - 开始处理新区块交易")
await self.process_block_transactions(latest_block, context)
self.last_notification_block = latest_block.number
self.next_block_interval = random.randint(MIN_BLOCK_INTERVAL, MAX_BLOCK_INTERVAL)
except Exception as e:
logging.error(f"群组 {self.chat_id} 监控出错: {str(e)}")
async def check_all_groups(context: ContextTypes.DEFAULT_TYPE):
"""检查所有活跃群组的任务"""
logging.info(f"开始检查所有群组,当前活跃群组数: {len(active_groups)}")
for group_id, monitor in active_groups.items():
try:
logging.info(f"正在检查群组 {group_id}")
await monitor.monitor_block(context)
except Exception as e:
logging.error(f"群组 {group_id} 监控出错: {str(e)}")
async def handle_group_event(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理群组事件"""
# 检查是否是群组消息
if update.message and update.message.chat.type in [ChatType.GROUP, ChatType.SUPERGROUP]:
chat_id = update.message.chat.id
logging.info(f"收到群组 {chat_id} 的事件")
# 检查是否是有关机器人的消息
if update.message.new_chat_members:
for member in update.message.new_chat_members:
if member.id == context.bot.id: # 机器人被加入群组
logging.info(f"机器人被添加到群组 {chat_id}")
if chat_id not in active_groups:
active_groups[chat_id] = GroupMonitor(chat_id)
logging.info(f"为群组 {chat_id} 创建新的监控器")
await update.message.reply_text("ETH交易监控启动!")
elif update.message.left_chat_member:
if update.message.left_chat_member.id == context.bot.id: # 机器人被移出群组
logging.info(f"机器人被移出群组 {chat_id}")
if chat_id in active_groups:
del active_groups[chat_id]
async def set_buttons(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理设置按钮的命令"""
chat_id = update.effective_chat.id
user_id = update.effective_user.id
# 只在私聊中处理,且只允许特定管理员
if update.effective_chat.type != ChatType.PRIVATE or user_id not in ADMIN_IDS:
return
# 发送设置说明
instructions = """请发送按钮配置,格式如下:
<code>按钮1名字 - 跳转链接</code>
<code>按钮2名字 - 跳转链接 | 按钮3名字 - 跳转链接</code>
<code>按钮4名字 - 跳转链接</code>
▪️ 插入跳转链接示例:
<code>按钮名字 - https://t.me/telegram</code>
💬 温馨提示:
1. 一行文本即一行按钮,同一行多个按钮通过 | 分割
2. 链接需要 http 开头
发送 /cancel 取消设置"""
# 发送说明时禁用链接预览
await update.message.reply_text(instructions, disable_web_page_preview=True, parse_mode='html')
# 显示当前按钮配置
current_config = button_configs.get(str(ADMIN_IDS[0]))
if current_config:
buttons = parse_button_config(current_config)
reply_markup = InlineKeyboardMarkup(buttons)
await update.message.reply_text("当前按钮配置预览:", reply_markup=reply_markup)
else:
await update.message.reply_text("当前没有按钮配置。")
context.user_data['waiting_for_button_config'] = True
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""取消当前操作"""
if 'waiting_for_button_config' in context.user_data:
del context.user_data['waiting_for_button_config']
await update.message.reply_text("已取消按钮设置。")
async def handle_button_config(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理按钮配置消息"""
if not context.user_data.get('waiting_for_button_config'):
return
# 检查是否是指定管理员
if update.effective_user.id not in ADMIN_IDS:
await update.message.reply_text("只有指定管理员才能设置按钮!")
return
try:
buttons = parse_button_config(update.message.text)
if not buttons:
await update.message.reply_text("按钮配置格式错误,请重新设置!")
return
# 保存配置
save_button_config(str(ADMIN_IDS[0]), update.message.text)
# 显示预览
reply_markup = InlineKeyboardMarkup(buttons)
await update.message.reply_text("按钮设置成功!以下是预览:", reply_markup=reply_markup)
del context.user_data['waiting_for_button_config']
except Exception as e:
await update.message.reply_text(f"设置按钮时出错:{str(e)}")
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 /start 命令"""
chat_id = update.effective_chat.id
user_id = update.effective_user.id
# 如果是私聊,直接进入设置按钮流程
if update.effective_chat.type == ChatType.PRIVATE:
# 检查是否是指定管理员
if user_id in ADMIN_IDS:
# 直接进入设置按钮流程
instructions = """请发送按钮配置,格式如下:
<code>按钮1名字 - 跳转链接</code>
<code>按钮2名字 - 跳转链接 | 按钮3名字 - 跳转链接</code>
<code>按钮4名字 - 跳转链接</code>
▪️ 插入跳转链接示例:
<code>按钮名字 - https://t.me/telegram</code>
💬 温馨提示:
1. 一行文本即一行按钮,同一行多个按钮通过 | 分割
2. 链接需要 http 开头
发送 /cancel 取消设置"""
context.user_data['waiting_for_button_config'] = True
await update.message.reply_text(instructions, disable_web_page_preview=True, parse_mode='html')
else:
await update.message.reply_text("只有指定管理员才能设置按钮!")
return
# 群组消息处理
if update.effective_chat.type in [ChatType.GROUP, ChatType.SUPERGROUP]:
try:
chat_member = await context.bot.get_chat_member(chat_id, user_id)
is_admin = chat_member.status in ['creator', 'administrator']
if not is_admin:
await update.message.reply_text("只有群组管理员才能启动监控!")
return
if chat_id not in active_groups:
active_groups[chat_id] = GroupMonitor(chat_id)
logging.info(f"群组 {chat_id} 的监控已由管理员 {user_id} 启动")
await update.message.reply_text("🎉 ETH交易监控机器人已启动!\n\n此机器人将自动监控以太坊交易,实时推送最新交易信息。\n\n💡 Tips: 机器人会自动过滤重复消息,确保推送的都是最新交易。")
else:
await update.message.reply_text("监控已经在运行中!")
except Exception as e:
logging.error(f"检查管理员权限时出错: {str(e)}")
await update.message.reply_text("检查权限时出错,请确保机器人具有足够的权限!")
async def stop(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 /stop 命令"""
chat_id = update.effective_chat.id
# 如果是私聊,忽略命令
if update.effective_chat.type == ChatType.PRIVATE:
await update.message.reply_text("这是一个群组机器人,只能在群组中使用!")
return
# 如果是群组消息,检查发送命令的用户是否是管理员
if update.effective_chat.type in [ChatType.GROUP, ChatType.SUPERGROUP]:
user_id = update.effective_user.id
try:
chat_member = await context.bot.get_chat_member(chat_id, user_id)
is_admin = chat_member.status in ['creator', 'administrator']
if not is_admin:
await update.message.reply_text("只有群组管理员才能停止监控!")
return
if chat_id in active_groups:
del active_groups[chat_id]
await update.message.reply_text("ETH交易监控已停止!")
logging.info(f"群组 {chat_id} 的监控已由管理员 {user_id} 停止")
else:
await update.message.reply_text("监控未在运行!")
except Exception as e:
logging.error(f"检查管理员权限时出错: {str(e)}")
await update.message.reply_text("检查权限时出错,请确保机器人具有足够的权限!")
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理按钮回调"""
query = update.callback_query
if query.data == "set_buttons":
await query.answer()
await set_buttons(update, context)
def main():
"""启动机器人"""
# 加载已保存的按钮配置
load_button_configs()
# 创建应用
application = Application.builder().token(TELEGRAM_TOKEN).build()
# 添加命令处理器
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("stop", stop))
application.add_handler(CommandHandler("cancel", cancel))
# 添加消息处理器
application.add_handler(MessageHandler(
filters.TEXT & filters.ChatType.PRIVATE & ~filters.COMMAND,
handle_button_config
))
# 添加群组事件处理器
application.add_handler(MessageHandler(
filters.ChatType.GROUPS & (filters.StatusUpdate.NEW_CHAT_MEMBERS | filters.StatusUpdate.LEFT_CHAT_MEMBER),
handle_group_event
))
# 设置定时任务,每10秒执行一次
job_queue = application.job_queue
job_queue.run_repeating(check_all_groups, interval=10, first=1)
# 启动机器人
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()