-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
82 lines (62 loc) · 2.28 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from influxdb import InfluxDBClient
import paho.mqtt.client as paho
from datetime import datetime
from threading import Thread
import json
import requests
# Define broker and port to listen to
broker = "localhost"
port = 1883
#slackUrl = "https://hooks.slack.com/services/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"
#slackHeader = "Content-type: application/json"
def post_message(data):
slackUrl = "https://hooks.slack.com/services/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"
slackHeader = {"Content-type": "application/json"}
data = {"text": data["message"]}
slackPayload = json.dumps(data)
x = requests.post(slackUrl, data=slackPayload, headers=slackHeader)
print(x)
#function called when a message is recieved
def on_message(client, userdata, message):
# Print data
print("message recieved", str(message.payload.decode("utf-8")))
print("message topic=",message.topic)
print("----------------------------------------------")
# Start new thread - Insert data into db
p = Thread(target=insert, args=(message.payload, message.topic,))
p.start()
p.join()
# Function to insert data into influxdb
def insert(data, measurement):
# Get the measurement
measurement = measurement.split("/")[1]
if(measurement == "messages"):
post_message(json.loads(data.decode("utf-8")))
# Generate the json body to insert
json_body = [
{
"measurement" : measurement,
"tags": {
},
"time": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
"fields": {
},
}
]
# Add the payload data to the insert json
json_body[0]["fields"] = json.loads(data.decode("utf-8"))
print(json_body)
# Write data to the database
dbClient.write_points(json_body)
# Connect to the database
dbClient = InfluxDBClient(host="localhost", port=8086, database="BinBotStats", username='binbot', password='b33pb00p!!')
# Create a mqtt client
client = paho.Client("Client1")
# Define which function to call when message is recived
client.on_message=on_message
# Connect to the database
client.connect(broker, port)
# Subscribe to all of the messages on the binBot base topic
client.subscribe("binBot/#")
# Loop forever
client.loop_forever()