-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathfeed.py
119 lines (94 loc) · 4.05 KB
/
feed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from typing import Union, Tuple
from abc import abstractmethod
from order_book import BybitOrderBook, BinanceOrderBook
import strategy
class Feed:
_last_bbo: Tuple[float, float]
_strategy: strategy.Strategy
def __init__(self, strat: strategy.Strategy) -> None:
self._strategy = strat
@abstractmethod
def on_websocket(self, data: dict) -> None:
pass
def on_order_snapshot(self, data: dict) -> None:
pass
def on_position_snapshot(self, data: dict) -> None:
pass
def on_book_reset(self):
pass
def on_depth_snapshot(self, data):
pass
class BybitFeed(Feed):
_order_book: BybitOrderBook
def __init__(self, strat: strategy.Strategy) -> None:
super().__init__(strat=strat)
def on_websocket(self, data: dict) -> None:
if data.get('topic') == 'orderBookL2_25.BTCUSD':
self.handle_order_book_l2(data=data)
elif data.get('topic') == 'order':
self._strategy.on_bybit_order_update(data=data)
elif data.get('topic') == 'execution':
self._strategy.on_bybit_execution(data=data)
def handle_order_book_l2(self, data: dict) -> None:
if data.get('type') == 'snapshot':
self._order_book = BybitOrderBook(
depth_snapshot=data)
curr_bbo = (self._order_book.bids[0][0],
self._order_book.asks[0][0])
self._strategy.on_bybit_bbo_chg(data=curr_bbo)
self._last_bbo = curr_bbo
else:
self._order_book.handle_delta(delta_message=data)
curr_bbo = (self._order_book.bids[0][0],
self._order_book.asks[0][0])
if curr_bbo != self._last_bbo:
self._strategy.on_bybit_bbo_chg(data=curr_bbo)
self._last_bbo = curr_bbo
def on_order_snapshot(self, data: dict) -> None:
self._strategy.on_bybit_order_snap(data=data)
def on_position_snapshot(self, data: dict) -> None:
self._strategy.on_bybit_position_snap(data=data)
class BinanceFeed(Feed):
_buf_depth_updates = []
_order_book: Union[None, BinanceOrderBook] = None
def __init__(self, strat: strategy.Strategy) -> None:
super().__init__(strat=strat)
def on_websocket(self, data: dict) -> None:
if data.get('e') == 'depthUpdate':
self.handle_book_delta(data=data)
def on_depth_snapshot(self, data: dict) -> None:
self.handle_book_snapshot(data=data)
def on_position_snapshot(self, data: list) -> None:
for pos in data:
if pos.get('symbol') == 'BTCUSD_PERP':
self._strategy.on_binance_position_snap(data=pos)
break
def on_book_reset(self) -> None:
self._order_book = None
def handle_book_delta(self, data: dict) -> None:
if self._order_book is None:
self._buf_depth_updates.append(data)
else:
self._order_book.parse_update(depth_update=data)
curr_bbo = (self._order_book.bids[0][0],
self._order_book.asks[0][0])
if curr_bbo != self._last_bbo:
self._strategy.on_binance_bbo_chg(data=curr_bbo)
self._last_bbo = curr_bbo
def handle_book_snapshot(self, data: dict) -> None:
self._order_book = BinanceOrderBook(depth_snapshot=data)
self.remove_prior_depth_updates(depth_snapshot=data)
for update in self._buf_depth_updates:
self._order_book.parse_update(depth_update=update)
self._buf_depth_updates.clear()
curr_bbo = (self._order_book.bids[0][0],
self._order_book.asks[0][0])
self._strategy.on_binance_bbo_chg(data=curr_bbo)
self._last_bbo = curr_bbo
def remove_prior_depth_updates(self, depth_snapshot: dict):
removed_updates = []
for update in self._buf_depth_updates:
if update.get('u') < depth_snapshot.get('lastUpdateId'):
removed_updates.append(update)
for update in removed_updates:
self._buf_depth_updates.remove(update)