Skip to content

Commit

Permalink
Merge pull request #20 from domschl/socketfixes
Browse files Browse the repository at this point in the history
Socket problems with timeouts and EventQueue,
* fixes #18, 
* fixes #19
  • Loading branch information
domschl authored Sep 26, 2019
2 parents cdb3607 + cc42f35 commit 8a9d202
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 23 deletions.
46 changes: 35 additions & 11 deletions fhem/fhem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import re
import socket
import errno
import ssl
import threading
import time
Expand Down Expand Up @@ -324,25 +325,28 @@ def _recv_nonblocking(self, timeout=0.1):
try:
data = self.sock.recv(32000)
except socket.error as err:
self.log.debug(
"Exception in non-blocking. Error: {}".format(err))
# Resource temporarily unavailable, operation did not complete are expected
if err.errno != errno.EAGAIN and err.errno!= errno.ENOENT:
self.log.debug(
"Exception in non-blocking (1). Error: {}".format(err))
time.sleep(timeout)

wok = 1
while len(data) > 0 and wok > 0:
time.sleep(timeout)
datai = b''

try:
datai = self.sock.recv(32000)
if len(datai) == 0:
wok = 0
else:
data += datai
except socket.error as err:
# Resource temporarily unavailable, operation did not complete are expected
if err.errno != errno.EAGAIN and err.errno!= errno.ENOENT:
self.log.debug(
"Exception in non-blocking (2). Error: {}".format(err))
wok = 0
self.log.debug(
"Exception in non-blocking. Error: {}".format(err))
self.sock.setblocking(True)
return data

Expand Down Expand Up @@ -701,6 +705,7 @@ def __init__(self, server, que, port=7072, protocol='telnet',
# self.set_loglevel(loglevel)
self.log = logging.getLogger('FhemEventQueue')
self.informcmd = "inform timer"
self.timeout = timeout
if serverregex is not None:
self.informcmd += " " + serverregex
if protocol != 'telnet':
Expand All @@ -709,6 +714,7 @@ def __init__(self, server, que, port=7072, protocol='telnet',
self.fhem = Fhem(server=server, port=port, use_ssl=use_ssl, username=username,
password=password, cafile=cafile, loglevel=loglevel)
self.fhem.connect()
time.sleep(timeout)
self.EventThread = threading.Thread(target=self._event_worker_thread,
args=(que, filterlist,
timeout, eventtimeout))
Expand All @@ -734,18 +740,29 @@ def set_loglevel(self, level):

def _event_worker_thread(self, que, filterlist, timeout=0.1,
eventtimeout=120):
self.log.debug("FhemEventQueue worker thread starting...")
if self.fhem.connected() is not True:
self.log.warning("EventQueueThread: Fhem is not connected!")
time.sleep(timeout)
self.fhem.send_cmd(self.informcmd)
data = ""
first = True
lastreceive = time.time()
eventThreadActive = True
while eventThreadActive is True:
self.eventThreadActive = True
while self.eventThreadActive is True:
while self.fhem.connected() is not True:
self.fhem.connect()
if self.fhem.connected():
time.sleep(timeout)
lastreceive = time.time()
self.fhem.send_cmd(self.informcmd)
else:
self.log.warning("Fhem is not connected in EventQueue thread, retrying!")
time.sleep(5.0)

if first is True:
first = False
self.log.debug("FhemEventQueue worker thread active.")
time.sleep(timeout)
if time.time() - lastreceive > eventtimeout:
self.log.debug("Event-timeout, refreshing INFORM TIMER")
self.fhem.send_cmd(self.informcmd)
Expand All @@ -762,9 +779,13 @@ def _event_worker_thread(self, que, filterlist, timeout=0.1,
if len(li) > 4:
dd = li[0].split('-')
tt = li[1].split(':')
dt = datetime.datetime(int(dd[0]), int(dd[1]),
int(dd[2]), int(tt[0]),
int(tt[1]), int(tt[2]))
try:
dt = datetime.datetime(int(dd[0]), int(dd[1]),
int(dd[2]), int(tt[0]),
int(tt[1]), int(tt[2]))
except:
self.log.debug("EventQueue: invalid date format in date={} time={}, event {} ignored".format(li[0],li[1],l))
continue
devtype = li[2]
dev = li[3]
val = ''
Expand Down Expand Up @@ -816,10 +837,13 @@ def _event_worker_thread(self, que, filterlist, timeout=0.1,
'unit': unit
}
que.put(ev)
# self.log.debug("Event queued for {}".format(ev['device']))
time.sleep(timeout)
self.fhem.close()
self.log.debug("FhemEventQueue worker thread terminated.")
return

def close(self):
'''Stop event thread and close socket.'''
self.eventThreadActive = False
time.sleep(0.5+self.timeout)
24 changes: 21 additions & 3 deletions selftest/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
## Automatic FHEM installation and self-test for CI.
## Automatic FHEM installation and python-fhem API self-test for CI.

The selftest tree is only used for continous integration with TravisCI.

The scripts automatically download the latest FHEM release, install and run it and then use the Python API to
The scripts automatically download the latest FHEM release, install, configure and run it and then use the Python API to
perform self-tests.

**Note**: Be careful when using this script, e.g. the install-class ***completely erases*** the existing FHEM installation
Tests performed:
* All tests are run with both python 2.7 and python 3.x
* FHEM connections via sockets, secure sockets, HTTP and HTTPS with password.
* Automatic creation of devices on Fhem (using all connection variants)
* Aquiring readings from Fhem using all different connection types and python versions

**WARNING**: Be careful when using this script, e.g. the install-class ***completely erases*** the existing FHEM installation
within the selftest tree (and all configuration files) to allow clean tests.

### Environment notes

Fhem requires the perl module IO::Socket::SSL for secure socket and HTTPS protocotls.

It needs to be installed with either:

* `cpan -i IO::Socket::SSL`
* or `apt-get install libio-socket-ssl-perl`
* or `pacman -S perl-io-socket-ssl`

If selftests fails on the first SSL connection, it is usually a sign, that the fhem-perl requirements for SSL are not installed.
1 change: 1 addition & 0 deletions selftest/fhem-config-addon.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ attr allowTelPort2 validFor telnetPort2

# HTTPS requires IO::Socket::SSL, to be installed with cpan -i IO::Socket::SSL
# or apt-get install libio-socket-ssl-perl
# or pacman -S perl-io-socket-ssl
define WEBS FHEMWEB 8084 global
attr WEBS HTTPS 1
attr WEBS sslVersion TLSv12:!SSLv3
Expand Down
87 changes: 78 additions & 9 deletions selftest/selftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
import logging
import time

try:
# Python 3.x
import queue
except:
# Python 2.x
import Queue as queue

try:
# Python 3.x
from urllib.parse import quote
Expand Down Expand Up @@ -121,21 +128,26 @@ def shutdown(self, fhem_url='localhost', protocol='http', port=8083):
self.log.warning("Fhem shutdown complete.")


def create_device(fhem, name, readings):
fhem.send_cmd("define {} dummy".format(name))
fh.send_cmd("attr {} setList state:on,off".format(name))
fh.send_cmd("set {} on".format(name))
def set_reading(fhi, name, reading, value):
fhi.send_cmd("setreading {} {} {}".format(name, reading, value))

def create_device(fhi, name, readings):
fhi.send_cmd("define {} dummy".format(name))
fhi.send_cmd("attr {} setList state:on,off".format(name))
fhi.send_cmd("set {} on".format(name))
readingList = ""
for rd in readings:
if readingList != "":
readingList += " "
readingList += rd
fh.send_cmd("attr {} readingList {}".format(name, readingList))
fhi.send_cmd("attr {} readingList {}".format(name, readingList))
for rd in readings:
fh.send_cmd("setreading {} {} {}".format(name, rd, readings[rd]))
set_reading(fhi,name,rd,readings[rd])


if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,format='%(asctime)s.%(msecs)03d %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
print("Start FhemSelfTest")
st = FhemSelfTester()
print("State 1: Object created.")
Expand Down Expand Up @@ -190,12 +202,12 @@ def create_device(fhem, name, readings):
connections = [
{'protocol': 'http',
'port': 8083},
{'protocol': 'telnet',
'port': 7072},
{'protocol': 'telnet',
'port': 7073,
'use_ssl': True,
'password': 'secretsauce'},
{'protocol': 'telnet',
'port': 7072},
{'protocol': 'https',
'port': 8084},
{'protocol': 'https',
Expand All @@ -205,7 +217,9 @@ def create_device(fhem, name, readings):
]

first = True

print("")
print("----------------- Fhem ------------")
print("Testing python-fhem Fhem():")
for connection in connections:
print('Testing connection to {} via {}'.format(
config['testhost'], connection))
Expand Down Expand Up @@ -264,7 +278,62 @@ def create_device(fhem, name, readings):
sys.exit(-7)
else:
print("states received: {}, ok.".format(len(states)))
fh.close()
print("")

print("")
print("---------------Queues--------------------------")
print("Testing python-fhem telnet FhemEventQueues():")
for connection in connections:
if connection['protocol'] != 'telnet':
continue
print('Testing connection to {} via {}'.format(
config['testhost'], connection))
fh = fhem.Fhem(config['testhost'], **connections[0])

que = queue.Queue()
que_events=0
fq = fhem.FhemEventQueue(config['testhost'], que, **connection)

devs = [
{'name': 'clima_sensor1',
'readings': {'temperature': 18.2,
'humidity': 88.2}},
{'name': 'clima_sensor2',
'readings': {'temperature': 19.1,
'humidity': 85.7}}
]
time.sleep(1.0)
for dev in devs:
for i in range(10):
print("Repetion: {}".format(i+1))
for rd in dev['readings']:
set_reading(fh,dev['name'],rd,18.0+i/0.2)
que_events += 1
time.sleep(0.05)


time.sleep(3) # This is crucial due to python's "thread"-handling.
ql = 0
has_data = True
while has_data:
try:
que.get(False)
except:
has_data = False
break
que.task_done()
ql += 1

print("Queue length: {}".format(ql))
if ql != que_events:
print("FhemEventQueue contains {} entries, expected {} entries, failure.".format(ql,que_events))
sys.exit(-8)
else:
print("Queue test success, Ok.")
fh.close()
fq.close()
time.sleep(0.5)
print("")

sys.exit(0)

0 comments on commit 8a9d202

Please sign in to comment.