-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
48 lines (37 loc) · 1.31 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from classes.google import GoogleAPI
import argparse
import zmq
import json
import signal
DEFAULT_GESTURES_JSON_FILE = "config/gestures.json"
signal.signal(signal.SIGINT, signal.SIG_DFL)
class Server:
def __init__(self) -> None:
try:
with open(DEFAULT_GESTURES_JSON_FILE) as f:
self.gestures = json.load(f)
except:
exit("No gestures file found")
def execute(self) -> None:
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://localhost:6969')
socket.setsockopt(zmq.SUBSCRIBE, b'')
while True:
# Wait for next request from client
message = socket.recv_string()
print(f"Received request: {message}")
# Execute request to Google Assistant APIs or reply with error
if message in self.gestures:
print("Executing request to Google Assistant -> ", self.gestures[message])
gapi = GoogleAPI()
gapi.execute(self.gestures[message])
else:
print("The gesture sent (", message, ") has not been recognized!")
def main(args):
server = Server()
server.execute()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
args = parser.parse_args()
main(args)