-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathinline_spotify.py
executable file
·363 lines (314 loc) · 12.5 KB
/
inline_spotify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
__version__ = (2, 1, 1)
# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
# █▀█ █ █ █ █▀█ █▀▄ █
# © Copyright 2022
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# meta pic: https://img.icons8.com/color/480/000000/playstation-buttons.png
# meta banner: https://mods.hikariatama.ru/badges/inline_spotify.jpg
# meta developer: @hikarimods
# scope: inline
# scope: hikka_only
# scope: hikka_min 1.5.3
import asyncio
import logging
import time
from math import ceil
from typing import Union
from telethon.tl.types import Message
from .. import loader, utils
from ..inline.types import InlineCall, InlineMessage
logger = logging.getLogger(__name__)
def create_bar(pb):
try:
percentage = ceil(pb["progress_ms"] / pb["item"]["duration_ms"] * 100)
bar_filled = ceil(percentage / 10)
bar_empty = 10 - bar_filled
bar = "".join("─" for _ in range(bar_filled))
bar += "🞆"
bar += "".join("─" for _ in range(bar_empty))
bar += (
f' {pb["progress_ms"] // 1000 // 60:02}:{pb["progress_ms"] // 1000 % 60:02} /'
)
bar += (
f' {pb["item"]["duration_ms"] // 1000 // 60:02}:{pb["item"]["duration_ms"] // 1000 % 60:02}'
)
except Exception:
bar = "──────🞆─── 0:00 / 0:00"
return bar
@loader.tds
class InlineSpotifyMod(loader.Module):
"""EXTENSION for SpotifyNow mod, that allows you to send interactive player."""
strings = {
"name": "InlineSpotify",
"input": "🎧 Enter the track name",
"search": "🔎 Search",
"listening_to": "I'm listening to",
"download": "📥 Download",
}
strings_ru = {
"input": "🎧 Введи название трека",
"search": "🔎 Поиск",
"_cmd_doc_splayer": (
"Отправляет интерактивный плеер Spotify (активен в течение 5 минут!)"
),
"_cls_doc": (
"Дополнение для модуля SpotifyNow, позволяющее вызвать интерактивный плеер."
),
"listening_to": "Сейчас я слушаю",
"download": "📥 Скачать",
}
strings_it = {
"input": "🎧 Inserisci il nome della traccia",
"search": "🔎 Cerca",
"_cmd_doc_splayer": (
"Invia un player Spotify interattivo (attivo per 5 minuti!)"
),
"_cls_doc": (
"Estensione per il modulo SpotifyNow, che consente di inviare un player"
" interattivo."
),
"listening_to": "Sto ascoltando",
"download": "📥 Scarica",
}
strings_es = {
"input": "🎧 Introduzca el nombre de la pista",
"search": "🔎 Buscar",
"_cmd_doc_splayer": (
"Envía un reproductor de Spotify interactivo (¡activo durante 5 minutos!)"
),
"_cls_doc": (
"Extensión para el módulo SpotifyNow, que permite enviar un reproductor"
" interactivo."
),
"listening_to": "Estoy escuchando",
"download": "📥 Descargar",
}
strings_uz = {
"input": "🎧 Ishora nomini kiriting",
"search": "🔎 Qidirish",
"_cmd_doc_splayer": (
"Qo'llab-quvvatlash uchun Spotify interaktiv oynasini yuboring (5 daqiqada"
" faol!)"
),
"_cls_doc": (
"SpotifyNow moduli uchun kengaytma, interaktiv oynani yuborish mumkin."
),
"listening_to": "Meni eshitib turaman",
"download": "📥 Yuklab oling",
}
strings_tr = {
"input": "🎧 Parçanın adını girin",
"search": "🔎 Ara",
"_cmd_doc_splayer": (
"Etkileşimli bir Spotify oynatıcı gönderir (5 dakika boyunca etkin!)"
),
"_cls_doc": (
"SpotifyNow modülü eklentisi, etkileşimli bir oynatıcı göndermenizi sağlar."
),
"listening_to": "Şu anda dinliyorum",
"download": "📥 İndir",
}
strings_kk = {
"input": "🎧 Тақырып атауын енгізіңіз",
"search": "🔎 іздеу",
"_cmd_doc_splayer": (
"Spotify интерактивті ойынды жіберіңіз (5 минутта белсенді!)"
),
"_cls_doc": (
"SpotifyNow модулі қосымшасы, интерактивті ойынды жіберуге мүмкіндік"
" береді."
),
"listening_to": "Ағымда маңызды болатындыңызды көрудіңіз керек",
"download": "📥 Жүктеу",
}
strings_de = {
"input": "🎧 Geben Sie den Namen des Tracks ein",
"search": "🔎 Suche",
"_cmd_doc_splayer": (
"Sendet einen interaktiven Spotify-Player (aktiv für 5 Minuten!)"
),
"_cls_doc": (
"Erweiterung für das SpotifyNow-Modul, das es ermöglicht, einen"
" interaktiven Player zu senden."
),
"listening_to": "Ich höre zu",
"download": "📥 Herunterladen",
}
async def _reload_sp(self, once: bool = False):
while True:
self.sp = getattr(self.lookup("SpotifyMod"), "sp", None)
if once:
break
await asyncio.sleep(5)
async def client_ready(self):
self.sp = None
self._tasks = [asyncio.ensure_future(self._reload_sp())]
await self._reload_sp(True)
self._active_forms = []
async def on_unload(self):
for task in self._tasks:
task.cancel()
async def inline_close(self, call: InlineCall):
if any(
call.form.get("uid") == getattr(i, "unit_id", None)
for i in self._active_forms
):
self._active_forms.remove(
next(
i
for i in self._active_forms
if call.form.get("uid") == getattr(i, "unit_id", None)
)
)
await call.delete()
async def sp_previous(self, call: InlineCall):
self.sp.previous_track()
await self.inline_iter(call, True)
async def sp_next(self, call: InlineCall):
self.sp.next_track()
await self.inline_iter(call, True)
async def sp_pause(self, call: InlineCall):
self.sp.pause_playback()
await self.inline_iter(call, True)
async def sp_play(self, call: InlineCall):
self.sp.start_playback()
await self.inline_iter(call, True)
async def sp_shuffle(self, call: InlineCall, state: bool):
self.sp.shuffle(state)
await self.inline_iter(call, True)
async def sp_repeat(self, call: InlineCall, state: bool):
self.sp.repeat(state)
await self.inline_iter(call, True)
async def sp_play_track(self, call: InlineCall, query: str):
try:
track = self.sp.track(query)
except Exception:
search = self.sp.search(q=query, type="track", limit=1)
try:
track = search["tracks"]["items"][0]
except Exception:
return
self.sp.add_to_queue(track["id"])
self.sp.next_track()
async def inline_iter(
self,
call: Union[InlineCall, InlineMessage],
once: bool = False,
uid: str = False,
):
try:
if not uid:
uid = getattr(call, "unit_id", call.form["id"])
until = time.time() + 5 * 60
while (
any(uid == i.unit_id for i in self._active_forms)
and until > time.time()
or once
):
pb = self.sp.current_playback()
is_resuming = (
"actions" in pb
and "disallows" in pb["actions"]
and "resuming" in pb["actions"]["disallows"]
and pb["actions"]["disallows"]["resuming"]
)
try:
artists = [artist["name"] for artist in pb["item"]["artists"]]
except Exception:
artists = []
try:
track = pb["item"]["name"]
track_id = pb["item"]["id"]
except Exception:
track = ""
track_id = ""
full_name = f"{', '.join(artists)} - {track}"
keyboard = [
[
(
{"text": "🔁", "callback": self.sp_repeat, "args": (False,)}
if pb["repeat_state"]
else {
"text": "🔂",
"callback": self.sp_repeat,
"args": (True,),
}
),
{"text": "⏮", "callback": self.sp_previous},
(
{"text": "⏸", "callback": self.sp_pause}
if is_resuming
else {"text": "▶️", "callback": self.sp_play}
),
{"text": "⏭", "callback": self.sp_next},
(
{
"text": "↩️",
"callback": self.sp_shuffle,
"args": (False,),
}
if pb["shuffle_state"]
else {
"text": "🔀",
"callback": self.sp_shuffle,
"args": (True,),
}
),
],
[
{
"text": self.strings("search"),
"input": self.strings("input"),
"handler": self.sp_play_track,
},
{
"text": self.strings("download"),
"callback": self._download,
"args": (full_name,),
},
{"text": "🔗 Link", "url": f"https://song.link/s/{track_id}"},
],
[{"text": "🚫 Close", "callback": self.inline_close}],
]
text = (
f"🎧 <b>{self.strings('listening_to')} {full_name}</b>\n<code>{create_bar(pb)}</code><a"
f" href='https://song.link/s/{track_id}'>\u206f</a>"
)
await call.edit(
text,
reply_markup=keyboard,
disable_web_page_preview=False,
)
if once:
break
await asyncio.sleep(10)
except Exception:
logger.exception("BRUH")
async def _download(self, call: InlineCall, track: str):
await call.answer(self.strings("download"))
await self.allmodules.commands["sfind"](
await call.form["caller"].reply(
f"<code>{self.get_prefix()}sfind {utils.escape_html(track)}</code>"
)
)
@loader.command(
ru_doc="Отправляет интерактивный плеер Spotify (активен в течение 5 минут!)",
it_doc="Invia un player interattivo di Spotify (attivo per 5 minuti!)",
de_doc="Sendet einen interaktiven Spotify-Player (aktiv für 5 Minuten!)",
tr_doc="Etkin Spotify oynatıcı gönderir (5 dakika boyunca aktif!)",
uz_doc="Faol Spotify oynatuvchisini yuboradi (5 daqiqada aktiv!)",
es_doc=(
"Envía un reproductor interactivo de Spotify (activo durante 5 minutos!)"
),
kk_doc="Интерактивті Spotify ойындысын жібереді (5 минутта актив!)",
)
async def splayer(self, message: Message):
"""Send interactive Spotify player (active only for 5 minutes!)"""
form = await self.inline.form(
"<b>🐻 Bear with us, while player is loading...</b>", message=message
)
self._active_forms += [form]
self._tasks += [asyncio.ensure_future(self.inline_iter(form))]