This repository has been archived by the owner on Oct 10, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdevice.py
105 lines (86 loc) · 3.15 KB
/
device.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""The Recteq integration."""
import logging
import pytuya
import async_timeout
from datetime import timedelta
from time import time
from threading import Lock
from .const import DOMAIN, CONF_NAME, DPS_ATTRS
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.helpers import update_coordinator
MAX_RETRIES = 3
CACHE_SECONDS = 20
UPDATE_INTERVAL = 30
_LOGGER = logging.getLogger(__name__)
class TuyaOutletDeviceWrapper:
"""Wrap pytuya.OutletDevice to cache status lock around polls."""
def __init__(self, device_id, ip_address, local_key, protocol):
self._device_id = device_id
self._ip_address = ip_address
self._local_key = local_key
self._protocol = protocol
self._device = pytuya.OutletDevice(device_id, ip_address, local_key)
self._device.set_version(float(protocol))
self._cached_status = None
self._cached_status_time = None
self._lock = Lock()
self.update()
@property
def available(self):
return self._cached_status != None
def set_status(self, dps, value):
self._cached_status = None
self._cached_status_time = None
return self._device.set_status(value, dps)
def get_status(self, dps):
if self._cached_status == None:
return None
return self._cached_status[dps]
def update(self):
self._lock.acquire()
try:
now = time()
if not self._cached_status or now - self._cached_status_time > CACHE_SECONDS:
retries = MAX_RETRIES
while retries:
retries -= 1
try:
self._cached_status = self._device.status()['dps']
self._cached_status_time = time()
return
except ConnectionError as err:
if retries <= 0:
self._cached_status = None
self._cached_status_time = time()
raise err
finally:
self._lock.release()
async def async_update(self):
# FIX ME - this isn't really async!
self.update()
class RecteqDevice(update_coordinator.DataUpdateCoordinator):
def __init__(self, hass, entry, outlet: TuyaOutletDeviceWrapper):
super().__init__(hass, _LOGGER,
name = entry.data[CONF_NAME],
update_interval=timedelta(seconds=UPDATE_INTERVAL),
)
#self._name = entry.data[CONF_NAME]
#self._entry = entry
self._outlet = outlet
@property
def available(self):
return self._outlet.available
async def _async_update_data(self):
try:
async with async_timeout.timeout(5):
await self._outlet.async_update()
except ConnectionError as err:
raise update_coordinator.UpdateFailed("Error fetching data") from err
def status(self, dps):
return self._outlet.get_status(dps)
@property
def device_state_attributes(self):
return {
attr: self.status(dps)
for dps, attr in DPS_ATTRS.items()
}