Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Complete Spotify Commands #7

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ nokari.log

# spotipy access token
.cache
.spat

# zipped app
nokari.pyz
Expand Down
146 changes: 137 additions & 9 deletions nokari/plugins/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import logging
import operator
import os
import types
Expand Down Expand Up @@ -28,7 +29,11 @@
NoSpotifyPresenceError,
SpotifyClient,
Track,
User,
)
from nokari.utils.spotify.typings import Playlist

_LOGGER = logging.getLogger("nokari.plugins.api")


class API(plugins.Plugin):
Expand Down Expand Up @@ -243,11 +248,14 @@ async def spotify_artist(self, ctx: Context, *, arguments: str) -> None:
spotify_code_url = artist.get_code_url(hikari.Color.from_rgb(*colors))
spotify_code = await self.spotify_client._get_spotify_code(spotify_code_url)

overview = await self.spotify_client.rest.artist_overview(artist.id)
top_tracks = await artist.get_top_tracks()
chunks = chunk_from_list(
[
f"{idx}. {track.formatted_url} - \N{fire} {track.popularity}"
for idx, track in enumerate(top_tracks, start=1)
f"{idx}. {track.formatted_url} - \N{fire} {track.popularity} - {plural(track_overview[1]):play,}"
for idx, (track, track_overview) in enumerate(
zip(top_tracks, overview["top_tracks"]), start=1
)
],
1024,
)
Expand All @@ -258,10 +266,18 @@ async def spotify_artist(self, ctx: Context, *, arguments: str) -> None:
hikari.Embed(title="Artist Info")
.set_thumbnail(cover)
.set_image(spotify_code)
.add_field(name="Name", value=artist.formatted_url)
.add_field(
name="Name",
value=artist.formatted_url
+ " <:spverified:903257221234831371>" * overview["verified"],
)
.add_field(
name="Follower Count",
value=f"{plural(artist.follower_count):follower,}",
value=format(plural(artist.follower_count), "follower,"),
)
.add_field(
name="Monthly Listeners",
value=format(plural(overview["monthly_listeners"]), "listener,"),
)
.add_field(name="Popularity", value=f"\N{fire} {artist.popularity}")
)
Expand All @@ -275,13 +291,15 @@ async def spotify_artist(self, ctx: Context, *, arguments: str) -> None:
value=chunk,
)

length = 2
if chunks:
# TODO: implement higher level API for this
length = len(chunks) + 1
length = len(chunks) + 2
initial_embed.set_footer(text=f"Page 1/{length}")

paginator.add_page(initial_embed)

idx = 1
for idx, chunk in enumerate(chunks, start=2):
embed = (
hikari.Embed(title="Top tracks cont.", description=chunk)
Expand All @@ -291,6 +309,17 @@ async def spotify_artist(self, ctx: Context, *, arguments: str) -> None:
)
paginator.add_page(embed)

listeners_embed = (
hikari.Embed(title="Top listeners")
.set_image(initial_embed.image)
.set_thumbnail(initial_embed.thumbnail)
.set_footer(text=f"Page {idx + 1}/{length}")
)
for city, listeners in overview["top_cities"]:
listeners_embed.add_field(
name=str(city), value=format(plural(listeners), "listener,")
)
paginator.add_page(listeners_embed)
await paginator.start()

@utils.checks.require_env(*_spotify_vars)
Expand Down Expand Up @@ -396,14 +425,113 @@ def get_disc_text(disc_number: int) -> str:
@utils.checks.require_env(*_spotify_vars)
@spotify.command(name="playlist")
@core.cooldown(1, 2, lightbulb.cooldowns.UserBucket)
async def spotify_playlist(self, ctx: Context) -> None:
"""Not implemented yet."""
async def spotify_playlist(self, ctx: Context, *, query: str) -> None:
"""Displays the information about a playlist on Spotify."""
if not (playlist := await self.spotify_client.get_item(ctx, query, Playlist)):
return

playlist = await self.spotify_client.ensure_playlist(playlist)
cover = await self.spotify_client._get_album(playlist.cover_url)
colors = self.spotify_client._get_colors(
BytesIO(cover), "top-bottom blur", playlist.cover_url
)[0]

spotify_code_url = playlist.get_code_url(hikari.Color.from_rgb(*colors))
spotify_code = await self.spotify_client._get_spotify_code(spotify_code_url)
_LOGGER.debug("%s", playlist.tracks)
chunks = chunk_from_list(
[
f"{idx}. {track.get_formatted_url(prepend_artists=True)}"
for idx, track in enumerate(playlist.tracks, start=1)
],
1024,
)

paginator = Paginator.default(ctx)

initial_embed = (
hikari.Embed(title="Playlist Info", description=playlist.description)
.set_thumbnail(cover)
.set_image(spotify_code)
.add_field(
name="Name",
value=f"{playlist.formatted_url}",
)
.add_field(
name="Owner",
value=str(playlist.owner),
)
.add_field(name="Total tracks", value=str(playlist.total_tracks))
.add_field(name="Colaborative", value=str(playlist.colaborative))
.add_field(name="Public", value=str(playlist.public))
)

initial_embed.add_field(
name="Tracks",
value=chunks.pop(0),
)

if chunks:
length = len(chunks) + 1
initial_embed.set_footer(text=f"Page 1/{length}")

paginator.add_page(initial_embed)

for idx, chunk in enumerate(chunks, start=2):
embed = (
hikari.Embed(title="Tracks cont.", description=chunk)
.set_image(initial_embed.image)
.set_thumbnail(initial_embed.thumbnail)
.set_footer(text=f"Pages {idx}/{length}")
)
paginator.add_page(embed)

await paginator.start()

@utils.checks.require_env(*_spotify_vars)
@spotify.command(name="user")
@core.cooldown(1, 2, lightbulb.cooldowns.UserBucket)
async def spotify_user(self, ctx: Context) -> None:
"""Not implemented yet."""
async def spotify_user(self, ctx: Context, *, query: str) -> None:
"""Displays the information of a user on Spotify."""
# TODO: add followers, following, and recent played artists
user = await self.spotify_client.get_item_from_id(query, User)
initial_embed = (
hikari.Embed(title="User Info", url=user.url)
.add_field("Name", user.display_name)
.add_field("ID", user.id)
.add_field("Follower count", str(user.follower_count))
.set_thumbnail(user.avatar_url or None)
)

paginator = Paginator.default(ctx)
playlists = await self.spotify_client.get_user_playlists(user.id)
chunks = chunk_from_list(
[f"{idx}. {playlist}" for idx, playlist in enumerate(playlists, start=1)],
1024,
)
if chunk := chunks.pop(0):
initial_embed.add_field(
name="User playlists",
value=chunk,
)

if chunks:
# TODO: implement higher level API for this
length = len(chunks) + 1
initial_embed.set_footer(text=f"Page 1/{length}")

paginator.add_page(initial_embed)

for idx, chunk in enumerate(chunks, start=2):
embed = (
hikari.Embed(title="User playlists cont.", description=chunk)
.set_image(initial_embed.image)
.set_thumbnail(initial_embed.thumbnail)
.set_footer(text=f"Page {idx}/{length}")
)
paginator.add_page(embed)

await paginator.start()

@utils.checks.require_env(*_spotify_vars)
@spotify.command(name="cache")
Expand Down
2 changes: 1 addition & 1 deletion nokari/plugins/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ async def get_page(pag: Paginator) -> typing.Tuple[hikari.Embed, int]:
if not records:
return get_embed("There is nothing here yet ._.", 0, 1, 1, "prolog"), 1

table: typing.List[typing.Union[str]] = sum(
table: typing.List[str] = sum(
[
list(
zip_longest(
Expand Down
3 changes: 1 addition & 2 deletions nokari/utils/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ class plural:
"""This will append s to the word if the value isn't 1."""

def __init__(self, value: int) -> None:
self.value = value
self.value = int(value)

def __format__(self, format_spec: str) -> str:
v = self.value
fmt = ""

# should I even use endswith here? w/e
if format_spec[-1] == ",":
format_spec = format_spec[:-1]
fmt = ","
Expand Down
5 changes: 3 additions & 2 deletions nokari/utils/paginator.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,15 @@ async def kwargs(self) -> Dict[str, Any]:

if isinstance(content, hikari.Embed):
kwargs["embed"] = content
kwargs["content"] = None

if not content.color:
content.color = self.ctx.color
else:
kwargs["embeds"] = []
kwargs["content"] = content

if self.is_paginated:
kwargs["component"] = self.component
kwargs["components"] = [self.component] if self.is_paginated else []

await asyncio.gather(
*[
Expand Down
54 changes: 49 additions & 5 deletions nokari/utils/spotify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import datetime
import logging
import re
import textwrap
import time
Expand Down Expand Up @@ -30,10 +31,13 @@
from .errors import LocalFilesDetected, NoSpotifyPresenceError
from .rest import SpotifyRest
from .typings import Artist # re-export
from .typings import User # re-export
from .typings import (
_RGB,
Album,
AudioFeatures,
Playlist,
SimplifiedPlaylist,
SongMetadata,
Spotify,
Track,
Expand All @@ -47,6 +51,7 @@

_RGBs = typing.List[_RGB]
PI_RAD: int = 180
_LOGGER = logging.getLogger("nokari.utils.spotify")


SPOTIFY_URL = re.compile(
Expand Down Expand Up @@ -789,7 +794,7 @@ def get_item(
raise RuntimeError("Please pass in either URI/URL/name.")

try:
id = self._get_id(type.type, id_or_query)
id = self._get_id(type.type, id_or_query.strip("<>"))
except RuntimeError:
return self.search_and_pick_item(ctx, id_or_query, type)
else:
Expand All @@ -802,6 +807,7 @@ async def get_item_from_id(self, _id: str, /, type: typing.Type[T]) -> T:
return item

res = await getattr(self.rest, type.type)(_id)
_LOGGER.debug("%s %s", _id, res)
item = self.cache.set_item(type.from_dict(self, res))
return item

Expand Down Expand Up @@ -835,6 +841,12 @@ async def search(self, q: str, /, type: typing.Type[T]) -> typing.List[T]:
res = await self.rest.albums(ids)
raw_items = res[plural]

elif type is Playlist:
return typing.cast(
typing.List[T],
[SimplifiedPlaylist.from_dict(self, item) for item in raw_items],
)

items = [type.from_dict(self, item) for item in raw_items]
self.cache.update_items(items)
return items
Expand All @@ -856,6 +868,7 @@ async def search_and_pick_item(
("Choose an album", "No album was found..."),
"{item.artists_str} - {item}",
),
"playlist": (("Choose a playlist", "No playlist was found..."), "{item}"),
}
items = await self.search(q, type)
title, format = tnf[type.type]
Expand Down Expand Up @@ -914,9 +927,21 @@ async def pick_from_sequence(
)
return seq[int(interaction.values[0])]

await respond.delete()
with suppress(hikari.NotFoundError):
await respond.delete()

return None

async def ensure_playlist(
self, playlist: SimplifiedPlaylist | Playlist
) -> Playlist:
# pylint: disable=unidiomatic-typecheck
if type(playlist) is SimplifiedPlaylist:
_LOGGER.debug("Simplified")
return await self.get_item_from_id(playlist.id, Playlist)

return typing.cast(Playlist, playlist)

async def get_audio_features(self, _id: str) -> AudioFeatures:
audio_features = self.cache.audio_features.get(_id)

Expand All @@ -928,6 +953,27 @@ async def get_audio_features(self, _id: str) -> AudioFeatures:
self.cache.set_item(audio_features)
return audio_features

async def get_user_playlists(self, user_id: str) -> typing.List[SimplifiedPlaylist]:
ids = self.cache.user_playlists.get(user_id)
if ids is not None:
playlists: typing.List[SimplifiedPlaylist] = []
playlist_cache = self.cache.user_playlists
for id in ids:
if not (playlist := playlist_cache.get(id)):
break

playlists.append(playlist)
else:
return playlists

res = await self.rest.user_playlists(user_id)
playlists = [
SimplifiedPlaylist.from_dict(self, playlist) for playlist in res["items"]
]
self.cache.update_items(playlists)
self.cache.user_playlists[user_id] = [playlist.id for playlist in playlists]
return playlists

async def get_top_tracks(
self, artist_id: str, country: str = "US"
) -> typing.List[Track]:
Expand All @@ -936,9 +982,7 @@ async def get_top_tracks(
top_tracks: typing.List[Track] = []
track_cache = self.cache.tracks
for id in ids:
track = track_cache.get(id)

if not track:
if not (track := track_cache.get(id)):
break

top_tracks.append(track)
Expand Down
Loading