-
Notifications
You must be signed in to change notification settings - Fork 0
/
building_data_requests.py
100 lines (70 loc) · 2.49 KB
/
building_data_requests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# Copyright 2018 BACnet Gateway. All rights reserved.
import requests
import json
PUBLIC_HOSTNAME = 'energize.andoverma.us'
INTERNAL_PORT = ''
# Request present value and units for the supplied instance
def get_value(facility, instance, gateway_hostname=PUBLIC_HOSTNAME, gateway_port=INTERNAL_PORT, live=False):
value = None
units = None
if str(instance).isdigit() and int(instance) > 0:
# Instance appears to be valid
# Set up request arguments
args = {
'facility': facility,
'instance': instance
}
if live:
args['live'] = True
# Issue request to web service
gateway_rsp = post_request(gateway_hostname, gateway_port, args)
# Convert JSON response to Python dictionary
dc_rsp = json.loads(gateway_rsp.text)
# Extract instance response from the dictionary
dc_inst_rsp = dc_rsp['instance_response']
# Extract result from instance response
if dc_inst_rsp['success']:
dc_data = dc_inst_rsp['data']
if dc_data['success']:
value = dc_data['presentValue']
units = dc_data['units']
return value, units
# Request multiple values from BACnet Gateway
def get_bulk(bulk_request, gateway_hostname=PUBLIC_HOSTNAME, gateway_port=INTERNAL_PORT):
bulk_rsp = []
if isinstance(bulk_request, list) and len(bulk_request):
# Set up request arguments
args = {
'bulk': json.dumps(bulk_request)
}
# Issue request to web service
gateway_rsp = post_request(gateway_hostname, gateway_port, args)
# Extract result
bulk_rsp = json.loads(gateway_rsp.text)
return bulk_rsp
# Post request to web service
def post_request(gateway_hostname, gateway_port, args):
# Normalize hostname
if not gateway_hostname:
gateway_hostname = PUBLIC_HOSTNAME
# Normalize port
gateway_port = str(gateway_port) if gateway_port else ''
# Set SSL and port fragments
if gateway_hostname == PUBLIC_HOSTNAME:
if gateway_port:
s = ''
port = ':' + gateway_port
else:
s = 's'
port = ''
else:
s = ''
if gateway_port:
port = ':' + gateway_port
else:
port = ':' + INTERNAL_PORT
# Format URL
url = 'http' + s + '://' + gateway_hostname + port
# Post request
gateway_rsp = requests.post(url, data=args)
return gateway_rsp