-
Notifications
You must be signed in to change notification settings - Fork 0
/
demo.py
45 lines (38 loc) · 1.03 KB
/
demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import json
import os
import time
import Adafruit_DHT
from dotenv import load_dotenv
from paho.mqtt import publish
load_dotenv()
host = os.getenv('HOST')
port = os.getenv('MQTT_BROKER_PORT')
dataset = {
"name": "RPI-Test-Device",
"cmd": "",
"data": ""
}
def get_DHT():
pin = 4
sensor = Adafruit_DHT.DHT11
h, t = Adafruit_DHT.read_retry(sensor, pin)
if h is not None and t is not None :
dataset["cmd"] = "temperature"
dataset["data"] = 0
dataset["temperature"] = t
publish.single("DataTopic", json.dumps(dataset),hostname=host, port=port)
dataset["cmd"] = "humidity"
dataset["data"] = 0
dataset["humidity"] = h
publish.single("DataTopic", json.dumps(dataset),hostname=host, port=port)
print("Temperature = {0:0.1f}*C Humidity = {1:0.1f}%".format(t, h))
else :
print('Read error')
try:
while True :
get_DHT()
time.sleep(1)
except KeyboardInterrupt:
print("Terminated by Keyboard")
finally:
print("End of Program")