-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathws_client.py
172 lines (147 loc) · 6.56 KB
/
ws_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import asyncio
import ssl
import certifi
from abc import abstractmethod
import websockets
import json
import aiohttp
from api_auth import BybitApiAuth, BinanceApiAuth
import feed
from typing import Coroutine
class WsClient:
_ssl_context: ssl.SSLContext
_sub_message: str
_feed: feed.Feed
def __init__(self, sub_message: str, feed_object: feed.Feed) -> None:
self._ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
self._ssl_context.load_verify_locations(cafile=certifi.where())
self._sub_message = sub_message
self._feed = feed_object
@abstractmethod
async def start(self) -> Coroutine:
pass
@abstractmethod
async def on_connect(self,
websocket: websockets.WebSocketClientProtocol) -> Coroutine:
pass
def on_disconnect(self) -> None:
pass
async def connect(self, uri: str, **kwargs) -> Coroutine:
try:
websocket = await websockets.connect(
uri=uri, ssl=self._ssl_context, **kwargs)
try:
await websocket.send(message=self._sub_message)
return await self.on_connect(websocket=websocket)
except (websockets.ConnectionClosed, TimeoutError) as e:
print(e)
return await self.start()
except (websockets.InvalidHandshake, TimeoutError) as e:
print(e)
return await self.start()
async def http_get(self, uri: str, **kwargs) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(url=uri, ssl=self._ssl_context,
**kwargs) as res:
return await res.json()
async def http_post(self, uri: str, data: str, **kwargs) -> dict:
async with aiohttp.ClientSession() as session:
async with session.post(url=uri, data=data, ssl=self._ssl_context,
**kwargs) as res:
return await res.json()
class BinanceWsClient(WsClient):
_BASE_API_ENDPOINT = 'https://dapi.binance.com'
_api_auth: BinanceApiAuth
_depth_snapshot_path = '/dapi/v1/depth?symbol=BTCUSD_PERP&limit=1000'
def __init__(self, api_file_path: str,
feed_object: feed.BinanceFeed) -> None:
self._api_auth = BinanceApiAuth(file_path=api_file_path)
sub_message = json.dumps(
obj={'method': 'SUBSCRIBE', 'params': ['btcusd_perp@depth@100ms']})
super().__init__(sub_message=sub_message, feed_object=feed_object)
async def start(self) -> Coroutine:
return await self.connect(uri='wss://dstream.binance.com/ws/')
def on_disconnect(self) -> None:
self._feed.on_book_reset()
async def get_depth_snapshot(self) -> None:
res = await self.http_get(
uri=self._BASE_API_ENDPOINT + self._depth_snapshot_path)
self._feed.on_depth_snapshot(data=res)
async def get_positions(self) -> None:
res = await self.http_get(
uri=self._BASE_API_ENDPOINT + self._api_auth.get_position_risk_auth(
pair='BTCUSD'),
headers={'X-MBX-APIKEY': self._api_auth.key})
self._feed.on_position_snapshot(data=res)
async def on_connect(self,
websocket: websockets.WebSocketClientProtocol
) -> Coroutine:
while True:
try:
res = json.loads(s=await websocket.recv())
self._feed.on_websocket(data=res)
if res.get('result') is None and res.get('id') == 1:
asyncio.create_task(coro=self.get_depth_snapshot())
asyncio.create_task(coro=self.get_positions())
except (websockets.ConnectionClosed, TimeoutError) as e:
print(e)
self.on_disconnect()
return await self.start()
class BybitWsClient(WsClient):
_BASE_API_ENDPOINT = 'https://api.bybit.com'
_api_auth: BybitApiAuth
_pong_recv = False
_ping_msg = json.dumps(obj={'op': 'ping'})
def __init__(self, api_file_path: str, feed_object: feed.BybitFeed) -> None:
self._api_auth = BybitApiAuth(file_path=api_file_path)
sub_message = json.dumps(
obj={'op': 'subscribe',
'args': ['orderBookL2_25.BTCUSD', 'order', 'execution',
'position']})
super().__init__(sub_message=sub_message, feed_object=feed_object)
async def start(self) -> Coroutine:
return await self.connect(uri=self._api_auth.get_websocket_uri(),
ping_interval=None)
async def get_active_orders(self) -> None:
res = await self.http_get(
uri=(self._BASE_API_ENDPOINT
+ self._api_auth.get_active_orders_auth(symbol='BTCUSD')))
self._feed.on_order_snapshot(data=res)
async def get_positions(self) -> None:
res = await self.http_get(
uri=(self._BASE_API_ENDPOINT
+ self._api_auth.get_position_list_auth(symbol='BTCUSD')))
self._feed.on_position_snapshot(data=res)
async def on_connect(self,
websocket: websockets.WebSocketClientProtocol) -> Coroutine:
heartbeat_t = asyncio.create_task(
coro=self.heartbeat(websocket=websocket))
while True:
try:
res = json.loads(s=await websocket.recv())
if res.get('topic') is not None:
self._feed.on_websocket(data=res)
elif (res.get('request').get('op') == 'subscribe'
and res.get('success') is True):
asyncio.create_task(coro=self.get_active_orders())
asyncio.create_task(coro=self.get_positions())
elif res.get('ret_msg') == 'pong' and res.get('success'):
self._pong_recv = True
except (websockets.ConnectionClosed, TimeoutError) as e:
print(e)
heartbeat_t.cancel()
return await self.start()
async def heartbeat(self,
websocket: websockets.WebSocketClientProtocol
) -> Coroutine:
while True:
try:
await websocket.send(message=self._ping_msg)
await asyncio.sleep(delay=30)
if not self._pong_recv:
return await self.start()
else:
self._pong_recv = False
except (websockets.ConnectionClosed, TimeoutError) as e:
print(e)
return await self.start()