-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsendinfo.py
64 lines (49 loc) · 1.54 KB
/
sendinfo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from liblo import *
import sys
import time
import re
matrix = open("data_train3_new.txt", "w+")
class MuseServer(ServerThread):
#listen for messages on port 5000
def __init__(self):
ServerThread.__init__(self, 5000)
self.GL=[]
#receive accelrometer data
@make_method('/muse/acc', 'fff')
def acc_callback(self, path, args):
acc_x, acc_y, acc_z = args
# print ("%s %f %f %f" % (path, acc_x, acc_y, acc_z))
#receive EEG data
@make_method('/muse/eeg', 'ffff')
def eeg_callback(self, path, args):
l_ear, l_forehead, r_forehead, r_ear = args
# print ("%s %f %f %f %f" % (path, l_ear, l_forehead, r_forehead, r_ear))
#handle unexpected messages
@make_method(None, None)
def fallback(self, path, args, types, src):
""" print("Unknown message \
\n\t Source: '%s' \
\n\t Address: '%s' \
\n\t Types: '%s ' \
\n\t Payload: '%s'" \
% (src.url, path, types, args))"""
word1 = "relative"
word2 = "absolute"
word3 = "score"
if re.search(word1, path) or re.search(word2, path) or re.search(word3, path):
# print(args)
self.GL = self.GL + args
if re.search("theta_session_score", path):
matrix.write(str(self.GL))
matrix.write("\n")
self.GL = []
try:
server = MuseServer()
except ServerError:
# print("err")
sys.exit()
server.start()
if __name__ == "__main__":
while 1:
time.sleep(1)
matrix.close()