-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtelegram_client.py
152 lines (131 loc) · 6.07 KB
/
telegram_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import os
import aiohttp
from telethon import TelegramClient, events
from telethon.tl.types import InputPeerChannel
from lib.config import (
telegram_api_id,
telegram_api_hash,
telegram_phone_number,
telegram_channel_usernames,
)
from gemini_llm import analyze_with_gemini
from db.db_operations import db_operations
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def download_image(message, client):
if message.photo:
path = await message.download_media("downloaded_media")
return path
elif message.document and message.document.mime_type.startswith("image"):
path = await message.download_media("downloaded_media")
return path
async def fetch_token_info_from_dexscreener(ticker):
async with aiohttp.ClientSession() as session:
url = f"https://api.dexscreener.io/latest/dex/search?q={ticker}"
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
if data["pairs"] and len(data["pairs"]) > 0:
pair = data["pairs"][0]
return {
"token_address": pair["baseToken"]["address"],
"token_name": pair["baseToken"]["name"],
"token_image": pair.get("info", {}).get("imageUrl"),
"network": (
pair["chainId"].capitalize()
if pair.get("chainId")
else None
),
"token_ticker": (
pair["baseToken"]["symbol"]
if pair.get("baseToken")
else ticker
),
}
return None
async def message_handler(event):
message = event.message
image_path = await download_image(message, event.client) if message.media else None
analysis_result = await analyze_with_gemini(image_path, message.text)
try:
if analysis_result:
if analysis_result["is_alpha_call"] and analysis_result["token_ticker"]:
# Remove $ from ticker if present
if analysis_result["token_ticker"].startswith("$"):
analysis_result["token_ticker"] = analysis_result["token_ticker"][
1:
]
# Check for missing network
# if not analysis_result.get("network"):
# network = await db_operations.token_repo.get_network_for_ticker(
# analysis_result["token_ticker"]
# )
# # network might be null
# if network and network != "null":
# analysis_result["network"] = network
# Fetch token info from DexScreener if network or address is missing
if not analysis_result.get("network") or not analysis_result.get(
"token_address"
):
token_info = await fetch_token_info_from_dexscreener(
analysis_result["token_ticker"]
)
if token_info:
analysis_result["token_address"] = token_info["token_address"]
analysis_result["token_name"] = token_info["token_name"]
analysis_result["token_ticker"] = token_info["token_ticker"]
if token_info["token_image"]:
analysis_result["token_image"] = token_info["token_image"]
if not analysis_result.get("network"):
analysis_result["network"] = token_info["network"]
channel = await event.get_chat()
analysis_result["channel_name"] = channel.title
if channel.username: # Public channel
analysis_result["message_url"] = (
f"https://t.me/{channel.username}/{message.id}"
)
else: # Private channel
analysis_result["message_url"] = (
f"https://t.me/c/{channel.id}/{message.id}"
)
analysis_result["date"] = message.date.isoformat()
# Only save the alpha call if we have all required information
if analysis_result.get("network") and analysis_result.get(
"token_address"
):
print("Alpha call detected")
await db_operations.token_repo.save_alpha_call(analysis_result)
else:
print("Message discarded: Missing network or token_address")
else:
print(
"Message discarded: Not an alpha call or missing required information"
)
else:
print("Failed to analyze message")
except Exception as e:
logger.error(f"An error occurred: {e}")
if image_path:
os.remove(image_path) # Clean up the downloaded image
async def start_telegram_client():
client = TelegramClient("session", telegram_api_id, telegram_api_hash)
try:
await client.start(phone=telegram_phone_number)
print("Client Created")
if not await client.is_user_authorized():
await client.send_code_request(telegram_phone_number)
await client.sign_in(telegram_phone_number, input("Enter the code: "))
channels = []
for username in telegram_channel_usernames:
channel = await client.get_entity(username)
channels.append(InputPeerChannel(channel.id, channel.access_hash))
# Register the message handler
client.add_event_handler(message_handler, events.NewMessage(chats=channels))
print("Listening for new messages...")
await client.run_until_disconnected()
except Exception as e:
logger.error(f"An error occurred: {e}")
finally:
await client.disconnect()