-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path检测账号存活数量
179 lines (148 loc) · 6.19 KB
/
检测账号存活数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
from telethon import TelegramClient, events
import asyncio
import zipfile
import os
import tempfile
import shutil
from telethon.errors import SessionPasswordNeededError, PhoneNumberInvalidError
from concurrent.futures import ThreadPoolExecutor
import time
API_ID = 21332425
API_HASH = 'f5d0cddc784e3a7a09ea9714ed01f238'
BOT_TOKEN = '7432303874:AAH2VVm31mAOVW3tJomqXKdLb6KHXGkK6pE'
# 创建机器人客户端
bot = TelegramClient('bot_session', API_ID, API_HASH)
# 创建线程池
executor = ThreadPoolExecutor(max_workers=10)
class Progress:
def __init__(self, total):
self.total = total
self.current = 0
self.valid_count = 0
self.results = []
self.lock = asyncio.Lock()
async def update(self, is_valid, session_name, username=None):
async with self.lock:
self.current += 1
if is_valid:
self.valid_count += 1
self.results.append(f"✅ {session_name} -> @{username}")
else:
self.results.append(f"❌ {session_name}")
return (f"正在检查 Session 文件...\n"
f"进度: {self.current}/{self.total} "
f"({(self.current/self.total*100):.1f}%)\n"
f"已找到可用: {self.valid_count}\n\n"
f"最近5个结果:\n" +
"\n".join(self.results[-5:]))
async def check_session(session_path, progress, progress_message):
"""检查单个session文件是否可用"""
session_name = os.path.basename(session_path)
try:
# 从session文件创建临时客户端
client = TelegramClient(session_path, API_ID, API_HASH)
await client.connect()
if await client.is_user_authorized():
me = await client.get_me()
username = me.username if me.username else me.id
await client.disconnect()
# 更新进度
progress_text = await progress.update(True, session_name, username)
await progress_message.edit(progress_text)
return True, session_path, username
await client.disconnect()
# 更新进度
progress_text = await progress.update(False, session_name)
await progress_message.edit(progress_text)
return False, session_path, None
except Exception as e:
# 更新进度
progress_text = await progress.update(False, f"{session_name} (错误: {str(e)})")
await progress_message.edit(progress_text)
return False, session_path, None
async def process_zip_file(zip_path, progress_message):
"""处理压缩包,检查所有session文件"""
temp_dir = tempfile.mkdtemp()
valid_sessions = []
session_info = []
try:
# 解压文件
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# 获取所有session文件
session_files = []
for root, dirs, files in os.walk(temp_dir):
for file in files:
if file.endswith('.session'):
session_files.append(os.path.join(root, file))
total = len(session_files)
if total == 0:
await progress_message.edit("未找到任何.session文件!")
return None
# 创建进度追踪器
progress = Progress(total)
# 并发检查所有session
tasks = []
for session_path in session_files:
task = check_session(session_path, progress, progress_message)
tasks.append(task)
results = await asyncio.gather(*tasks)
# 处理结果
for is_valid, session_path, username in results:
if is_valid:
valid_sessions.append(session_path)
session_info.append(f"@{username}")
if valid_sessions:
# 创建新的压缩包包含有效的session
output_zip = 'valid_sessions.zip'
with zipfile.ZipFile(output_zip, 'w') as zipf:
for session in valid_sessions:
zipf.write(session, os.path.basename(session))
# 生成最终报告,只显示统计数据
final_report = (f"✅ 检查完成!\n"
f"总数: {total}\n"
f"可用: {len(valid_sessions)}")
await progress_message.edit(final_report)
return output_zip
else:
await progress_message.edit(f"检查完成!\n总数: {total}\n没有找到可用的session")
return None
finally:
# 清理临时目录
shutil.rmtree(temp_dir)
@bot.on(events.NewMessage(pattern='/start'))
async def start_handler(event):
await event.reply('请发送包含session文件的压缩包,我将检查其中可用的session并返回。')
@bot.on(events.NewMessage(func=lambda e: e.file and e.file.name.endswith('.zip')))
async def handle_zip(event):
"""处理接收到的压缩包"""
sender = await event.get_sender()
chat_id = event.chat_id
progress_message = await event.reply("收到压缩包,开始处理...\n⏳ 正在下载压缩包")
# 下载压缩包
download_path = await event.download_media()
try:
await progress_message.edit("⏳ 正在检查session文件...")
# 处理压缩包
valid_sessions_zip = await process_zip_file(download_path, progress_message)
if valid_sessions_zip:
# 发送包含有效session的压缩包
await bot.send_file(
chat_id,
valid_sessions_zip,
caption="✅ 这是筛选出的可用session文件"
)
# 清理临时文件
os.remove(valid_sessions_zip)
except Exception as e:
await progress_message.edit(f"❌ 处理过程中出现错误: {str(e)}")
finally:
# 清理下载的压缩包
os.remove(download_path)
async def main():
print('正在启动机器人...')
await bot.start(bot_token=BOT_TOKEN)
print('机器人启动成功!')
await bot.run_until_disconnected()
# 运行主程序
asyncio.run(main())