forked from st3fan/mijia-hub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapplication.go
122 lines (102 loc) · 3.34 KB
/
application.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main
import (
"fmt"
"log"
"path"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/pkg/errors"
)
type application struct {
cfg configuration
sensors map[string]*Sensor
writeAPI api.WriteAPI
}
func newApplication(cfg configuration) (*application, error) {
app := &application{
cfg: cfg,
sensors: map[string]*Sensor{},
}
if cfg.InfluxDBServer != "" {
client := influxdb2.NewClient(cfg.InfluxDBServer, cfg.InfluxDBToken)
app.writeAPI = client.WriteAPI(cfg.InfluxDBOrg, cfg.InfluxDBBucket)
}
return app, nil
}
func (app *application) run() error {
log.Printf("[I] Creating state directory <%s>", defaultStateDirectory)
if err := createDirectory(defaultStateDirectory); err != nil {
return errors.Wrapf(err, "Could not create state directory <%s>", defaultStateDirectory)
}
storageDirectory := path.Join(path.Join(defaultStateDirectory, "storage"))
log.Printf("[I] Creating storage directory <%s>", storageDirectory)
if err := createDirectory(storageDirectory); err != nil {
return errors.Wrapf(err, "Could not create storage directory <%s>", storageDirectory)
}
// hc.OnTermination(func() {
// log.Println("hc.OnTermination")
// for address, sensor := range sensors {
// <-sensor.transport.Stop()
// delete(sensors, address)
// }
// time.Sleep(100 * time.Millisecond)
// os.Exit(1)
// })
scanner, err := NewScanner()
if err != nil {
return errors.Wrap(err, "Could not create a Scanner")
}
if err := scanner.Start(); err != nil {
return errors.Wrap(err, "Could not start scanner")
}
subscription := scanner.Subscribe()
for event := range subscription.Events() {
switch event.(type) {
case EventDiscoveredSensor:
if app.cfg.Verbose {
log.Printf("Received EventDiscoveredSensor: %+v", event)
}
app.addSensor(event.(EventDiscoveredSensor).Address, event.(EventDiscoveredSensor).Data)
case EventReceivedSensorData:
if app.cfg.Verbose {
log.Printf("Received EventDiscoveredSensorData: %+v", event)
}
app.updateSensor(event.(EventReceivedSensorData).Address, event.(EventReceivedSensorData).Data)
case EventExpiredSensor:
if app.cfg.Verbose {
log.Printf("Received EventExpiredSensor: %+v", event)
}
app.removeSensor(event.(EventExpiredSensor).Address)
}
}
return nil
}
func (app *application) addSensor(address string, data SensorData) {
if _, found := app.sensors[address]; !found {
sensor, err := NewSensor(address, data, app.cfg.Pin)
if err != nil {
log.Printf("[E] Could not create sensor <%s>: %s", address, err)
return
}
app.sensors[address] = sensor
sensor.Update(data)
}
}
func (app *application) updateSensor(address string, data SensorData) {
if sensor, found := app.sensors[address]; found {
if err := sensor.Update(data); err != nil {
log.Printf("[E] Failed to update Sensor <%s>: %s", address, err)
}
if app.writeAPI != nil {
metrics := fmt.Sprintf("batteryLevel=%d,humidity=%f,temperature=%f", data.BatteryLevel, data.Humidity, data.Temperature)
record := fmt.Sprintf("mijia,sensor=%s %s %d", address, metrics, time.Now().UnixNano())
log.Printf("[D] Influx Record: %s", record)
app.writeAPI.WriteRecord(record)
app.writeAPI.Flush()
}
}
}
func (app *application) removeSensor(address string) {
// TODO Do we just remove the sensor storage?
}