-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathestim2bCom.py
137 lines (115 loc) · 3.25 KB
/
estim2bCom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-or-later
# Chaturbate E-stim 2B controller, Connects Chaturbate chat to E-Stim systems 2B
# Copyright (C) 2021 cb-stimmer
import asyncio
import functools
import json
import os
import random
import re
import string
import sys
import threading
import time
import traceback
import urllib.request
from datetime import datetime
from numbers import Number
from queue import Empty, SimpleQueue
import queue
import warnings
import estim2b
from Tip import Tip
class estim2bCom(object):
"""docstring for estim2bCom"""
def __init__(self, port, tip_levels, special_tips, mode, power):
super(estim2bCom, self).__init__()
self.tipLevels = tip_levels
self.specialTips = special_tips
self.mode = mode
self.power = power
self.port = port
self.setup()
def setup(self):
print(self.port)
self.e2b = estim2b.Estim(self.port, verbose=False)
self.e2b.status()
self.e2b.setMode(self.mode)
if self.power == "L":
self.e2b.setLow
elif self.power == "H":
self.e2b.setHigh
else:
self.e2b.setDynamic
self.e2b.kill()
def do_comm(self, timeout, val):
if isinstance(val, Number):
print("do_comm with " + str(timeout) + 's@' +
str(val))
self.e2b.setOutputs(val,val,timeout)
# await asyncio.sleep(timeout)
print("end do comm")
def getAmmount(self, e):
return e['ammount']
def process_tip(self, ammount):
# print("processing tip of ",ammount," \n")
for lvl in self.specialTips:
if lvl["ammount"] == ammount:
self.e2b.setOutputs(lvl["levelA"],lvl["levelB"],lvl["time"])
return
for lvl in sorted(self.tipLevels, key=self.getAmmount, reverse=True):
if ammount >= lvl["ammount"]:
self.e2b.setOutputs(lvl["levelA"],lvl["levelB"],lvl["time"])
return
async def communicator(self, tips_queue, broadcaster):
try:
while True:
while True: # wait for queue message but also allow interrupts
# print("Quesize " + str(tips_queue.qsize()))
try:
msg = tips_queue.get(timeout=0.1) # (timeout=0.1)
if type(msg) == Tip:
print("\n\n Tip recieved " + str(msg.val) + "\n\n")
break
else:
if msg[0] == 'delay':
delay += msg[1]
print("new delay: " + str(delay))
elif msg[0] == "broadcaster":
broadcaster = msg[1]
tips_queue.clear()
#user = init_user(tips_queue, broadcaster)
print('new broadcaster ' + broadcaster)
elif msg[0] == 'levels_reload':
print("levels_reload")
#user = init_user(tips_queue, broadcaster)
continue
except:
self.e2b.kill()
#print("except empty")
continue
if tips_queue.empty():
self.e2b.kill()
tip: Tip = msg
# print("tip processing \n")
self.process_tip(msg.val)
except ValueError as ex: # non-int in queue
print("com valueerror")
print(traceback.format_exc())
self.e2b.kill()
except Exception as ex:
print("comm error")
print(traceback.format_exc())
self.e2b.kill()
finally:
self.e2b.kill()
return
# class Tip(object):
# val = 0
# timestamp = 0
# def __init__(self, val, timestamp, level=None):
# self.val = val
# self.timestamp = timestamp
# def __str__(self):
# return f'Tip val:{self.val} timestamp:{self.timestamp}'