-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathsocket_utils.py
75 lines (63 loc) · 2.65 KB
/
socket_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from time import sleep
from PyQt5.QtCore import QThread
class WebSocketHandler: #(OCStreamer):
feed_opened = False
#def __init__(self):
# super().__init__()
def __init__(self,subscribed_list,feedJson, orderJson,subscriber, api): #tokenlist,
super(WebSocketHandler, self).__init__()
#self.tokenlist = tokenlist
self.api = api
self.subscribedlist = subscribed_list
self.feedJson = feedJson
self.orderJson = orderJson
self.subscriber = subscriber
def event_handler_feed_update(self, message):
if 'tk' in message:
self.feedJson.write(message['tk'], message)
def event_handler_order_update(self, inmessage):
if 'norenordno' in inmessage and 'status' in inmessage:
self.orderJson.write(inmessage['norenordno'], {'status': str(inmessage['status'])})
def open_callback(self):
print("Socket Opened")
self.feed_opened = True
#self.api.subscribe(self.tokenlist.get())
sub = self.subscribedlist.get()
#print(f"Sub List :: {sub}")
self.subscriber.update_newsublist(sub, force_subscribe=True)
def setup_websocket(self):
self.api.start_websocket(
order_update_callback=self.event_handler_order_update,
subscribe_callback=self.event_handler_feed_update,
socket_open_callback=self.open_callback
)
sleep(.5)
while not self.feed_opened:
pass
def start(self):
self.setup_websocket()
class WSSubscriber(QThread):
def __init__(self,subscribed_list, api): #tokenlist,
super(WSSubscriber, self).__init__()
#self.tokenlist = tokenlist
self.api = api
self.subscribedlist = subscribed_list
#self.newsublist = []
def update_newsublist(self, new_list, force_subscribe= False):
#print("Getting new tokens")
if not force_subscribe:
new_tokens = self.find_new_items(new_list)
else:
new_tokens = new_list
if new_tokens:
self.subscribe(new_tokens, force_subscribe=force_subscribe)
def find_new_items(self, tokens):
subscribed_set = set(self.subscribedlist.get())
new_items = [item for item in tokens if item not in subscribed_set]
return new_items
def subscribe(self, tokens, force_subscribe):
for i in range(0, len(tokens), 30):
tokens_batch = tokens[i:i+30]
self.api.subscribe(instrument=tokens_batch)
if not force_subscribe:
self.subscribedlist.append(tokens_batch)