diff --git a/src/PubSubClient.cpp b/src/PubSubClient.cpp old mode 100755 new mode 100644 index 5932bdbe..89d0f059 --- a/src/PubSubClient.cpp +++ b/src/PubSubClient.cpp @@ -7,30 +7,54 @@ #include "PubSubClient.h" #include "Arduino.h" +#ifdef ESP8266 + #define INIT_FINGERPRINT() this->fingerprint = NULL; +#else + #define INIT_FINGERPRINT() +#endif + PubSubClient::PubSubClient() { this->_state = MQTT_DISCONNECTED; this->_client = NULL; this->stream = NULL; setCallback(NULL); + this->_available = 0; + INIT_FINGERPRINT() } PubSubClient::PubSubClient(Client& client) { this->_state = MQTT_DISCONNECTED; setClient(client); this->stream = NULL; + this->_available = 0; + INIT_FINGERPRINT() } +#ifdef ESP8266 +PubSubClient::PubSubClient(WiFiClientSecure& client, const char* fingerprint) { + this->_state = MQTT_DISCONNECTED; + setClient(client); + this->stream = NULL; + this->_available = 0; + this->fingerprint = fingerprint; +} +#endif + PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(addr, port); setClient(client); this->stream = NULL; + this->_available = 0; + INIT_FINGERPRINT() } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(addr,port); setClient(client); setStream(stream); + this->_available = 0; + INIT_FINGERPRINT() } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; @@ -38,6 +62,8 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR setCallback(callback); setClient(client); this->stream = NULL; + this->_available = 0; + INIT_FINGERPRINT() } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -45,6 +71,8 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR setCallback(callback); setClient(client); setStream(stream); + this->_available = 0; + INIT_FINGERPRINT() } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { @@ -52,12 +80,16 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { setServer(ip, port); setClient(client); this->stream = NULL; + this->_available = 0; + INIT_FINGERPRINT() } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(ip,port); setClient(client); setStream(stream); + this->_available = 0; + INIT_FINGERPRINT() } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; @@ -65,6 +97,8 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, setCallback(callback); setClient(client); this->stream = NULL; + this->_available = 0; + INIT_FINGERPRINT() } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -72,6 +106,8 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, setCallback(callback); setClient(client); setStream(stream); + this->_available = 0; + INIT_FINGERPRINT() } PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { @@ -79,12 +115,16 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { setServer(domain,port); setClient(client); this->stream = NULL; + this->_available = 0; + INIT_FINGERPRINT() } PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setClient(client); setStream(stream); + this->_available = 0; + INIT_FINGERPRINT() } PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; @@ -92,6 +132,8 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN setCallback(callback); setClient(client); this->stream = NULL; + this->_available = 0; + INIT_FINGERPRINT() } PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -99,6 +141,8 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN setCallback(callback); setClient(client); setStream(stream); + this->_available = 0; + INIT_FINGERPRINT() } boolean PubSubClient::connect(const char *id) { @@ -122,6 +166,29 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass } else { result = _client->connect(this->ip, this->port); } + +#ifdef ESP8266 + if (fingerprint != NULL) { + if (domain != NULL) { + // there's only one way to set fingerprint: using the WiFiClientSecure-based constructor, so this cast is safe + if (!static_cast(_client)->verify(fingerprint, domain)) { + _state = MQTT_TLS_BAD_SERVER_CREDENTIALS; + return false; + } + } + else { + char buffer[16]; // IPv4 only (which is what IPAddress supports anyway) + + ip.toString().toCharArray(buffer, 16); + + if (!static_cast(_client)->verify(fingerprint, buffer)) { + _state = MQTT_TLS_BAD_SERVER_CREDENTIALS; + return false; + } + } + } +#endif + if (result == 1) { nextMsgId = 1; // Leave room in the buffer for header and variable length field @@ -175,7 +242,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass lastInActivity = lastOutActivity = millis(); - while (!_client->available()) { + while (!available()) { unsigned long t = millis(); if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) { _state = MQTT_CONNECTION_TIMEOUT; @@ -205,16 +272,26 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass return true; } +// return and cache the available number of bytes in the client; +// remember to reduce the available count when consuming the buffer +int PubSubClient::available() { + if (_available == 0) { + _available = _client->available(); + } + return _available; +} + // reads a byte into result boolean PubSubClient::readByte(uint8_t * result) { uint32_t previousMillis = millis(); - while(!_client->available()) { + while(!available()) { uint32_t currentMillis = millis(); if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){ return false; } } *result = _client->read(); + _available -= 1; return true; } @@ -281,62 +358,67 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { boolean PubSubClient::loop() { if (connected()) { - unsigned long t = millis(); - if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { - if (pingOutstanding) { - this->_state = MQTT_CONNECTION_TIMEOUT; - _client->stop(); - return false; - } else { - buffer[0] = MQTTPINGREQ; - buffer[1] = 0; - _client->write(buffer,2); - lastOutActivity = t; - lastInActivity = t; - pingOutstanding = true; + do { + unsigned long t = millis(); + if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { + if (pingOutstanding) { + this->_state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + return false; + } else { + buffer[0] = MQTTPINGREQ; + buffer[1] = 0; + _client->write(buffer,2); + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; + } } - } - if (_client->available()) { - uint8_t llen; - uint16_t len = readPacket(&llen); - uint16_t msgId = 0; - uint8_t *payload; - if (len > 0) { - lastInActivity = t; - uint8_t type = buffer[0]&0xF0; - if (type == MQTTPUBLISH) { - if (callback) { - uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */ - memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ - buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ - char *topic = (char*) buffer+llen+2; - // msgId only present for QOS>0 - if ((buffer[0]&0x06) == MQTTQOS1) { - msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; - payload = buffer+llen+3+tl+2; - callback(topic,payload,len-llen-3-tl-2); - - buffer[0] = MQTTPUBACK; - buffer[1] = 2; - buffer[2] = (msgId >> 8); - buffer[3] = (msgId & 0xFF); - _client->write(buffer,4); - lastOutActivity = t; - - } else { - payload = buffer+llen+3+tl; - callback(topic,payload,len-llen-3-tl); + + if (available()) { + uint8_t llen; + uint16_t len = readPacket(&llen); + uint16_t msgId = 0; + uint8_t *payload; + if (len > 0) { + lastInActivity = t; + uint8_t type = buffer[0]&0xF0; + if (type == MQTTPUBLISH) { + if (callback) { + uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */ + memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ + buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ + char *topic = (char*) buffer+llen+2; + // msgId only present for QOS>0 + if ((buffer[0]&0x06) == MQTTQOS1) { + msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; + payload = buffer+llen+3+tl+2; + callback(topic,payload,len-llen-3-tl-2); + + buffer[0] = MQTTPUBACK; + buffer[1] = 2; + buffer[2] = (msgId >> 8); + buffer[3] = (msgId & 0xFF); + _client->write(buffer,4); + lastOutActivity = t; + + } else { + payload = buffer+llen+3+tl; + callback(topic,payload,len-llen-3-tl); + } } + } else if (type == MQTTPINGREQ) { + buffer[0] = MQTTPINGRESP; + buffer[1] = 0; + _client->write(buffer,2); + } else if (type == MQTTPINGRESP) { + pingOutstanding = false; } - } else if (type == MQTTPINGREQ) { - buffer[0] = MQTTPINGRESP; - buffer[1] = 0; - _client->write(buffer,2); - } else if (type == MQTTPINGRESP) { - pingOutstanding = false; } } - } + } while (_available > 0); // can't leave data in the buffer, or subsequent publish() calls + // may fail (axTLS is only half-duplex, so writes will fail, to + // avoid losing information) return true; } return false; diff --git a/src/PubSubClient.h b/src/PubSubClient.h old mode 100755 new mode 100644 index be4bd674..16bf8d31 --- a/src/PubSubClient.h +++ b/src/PubSubClient.h @@ -12,6 +12,10 @@ #include "Client.h" #include "Stream.h" +#ifdef ESP8266 +#include "WiFiClientSecure.h" +#endif + #define MQTT_VERSION_3_1 3 #define MQTT_VERSION_3_1_1 4 @@ -42,16 +46,17 @@ //#define MQTT_MAX_TRANSFER_SIZE 80 // Possible values for client.state() -#define MQTT_CONNECTION_TIMEOUT -4 -#define MQTT_CONNECTION_LOST -3 -#define MQTT_CONNECT_FAILED -2 -#define MQTT_DISCONNECTED -1 -#define MQTT_CONNECTED 0 -#define MQTT_CONNECT_BAD_PROTOCOL 1 -#define MQTT_CONNECT_BAD_CLIENT_ID 2 -#define MQTT_CONNECT_UNAVAILABLE 3 -#define MQTT_CONNECT_BAD_CREDENTIALS 4 -#define MQTT_CONNECT_UNAUTHORIZED 5 +#define MQTT_TLS_BAD_SERVER_CREDENTIALS -5 +#define MQTT_CONNECTION_TIMEOUT -4 +#define MQTT_CONNECTION_LOST -3 +#define MQTT_CONNECT_FAILED -2 +#define MQTT_DISCONNECTED -1 +#define MQTT_CONNECTED 0 +#define MQTT_CONNECT_BAD_PROTOCOL 1 +#define MQTT_CONNECT_BAD_CLIENT_ID 2 +#define MQTT_CONNECT_UNAVAILABLE 3 +#define MQTT_CONNECT_BAD_CREDENTIALS 4 +#define MQTT_CONNECT_UNAUTHORIZED 5 #define MQTTCONNECT 1 << 4 // Client request to connect to Server #define MQTTCONNACK 2 << 4 // Connect Acknowledgment @@ -87,8 +92,10 @@ class PubSubClient { uint16_t nextMsgId; unsigned long lastOutActivity; unsigned long lastInActivity; + int _available; bool pingOutstanding; MQTT_CALLBACK_SIGNATURE; + int available(); uint16_t readPacket(uint8_t*); boolean readByte(uint8_t * result); boolean readByte(uint8_t * result, uint16_t * index); @@ -99,6 +106,11 @@ class PubSubClient { uint16_t port; Stream* stream; int _state; + +#ifdef ESP8266 + const char* fingerprint; +#endif + public: PubSubClient(); PubSubClient(Client& client); @@ -114,6 +126,10 @@ class PubSubClient { PubSubClient(const char*, uint16_t, Client& client, Stream&); PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); + +#ifdef ESP8266 + PubSubClient(WiFiClientSecure& client, const char* fingerprint); +#endif PubSubClient& setServer(IPAddress ip, uint16_t port); PubSubClient& setServer(uint8_t * ip, uint16_t port);