Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 135 additions & 53 deletions src/PubSubClient.cpp
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -7,98 +7,142 @@
#include "PubSubClient.h"
#include "Arduino.h"

#ifdef ESP8266
#define INIT_FINGERPRINT() this->fingerprint = NULL;
#else
#define INIT_FINGERPRINT()
#endif

PubSubClient::PubSubClient() {
this->_state = MQTT_DISCONNECTED;
this->_client = NULL;
this->stream = NULL;
setCallback(NULL);
this->_available = 0;
INIT_FINGERPRINT()
}

PubSubClient::PubSubClient(Client& client) {
this->_state = MQTT_DISCONNECTED;
setClient(client);
this->stream = NULL;
this->_available = 0;
INIT_FINGERPRINT()
}

#ifdef ESP8266
PubSubClient::PubSubClient(WiFiClientSecure& client, const char* fingerprint) {
this->_state = MQTT_DISCONNECTED;
setClient(client);
this->stream = NULL;
this->_available = 0;
this->fingerprint = fingerprint;
}
#endif

PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
this->_state = MQTT_DISCONNECTED;
setServer(addr, port);
setClient(client);
this->stream = NULL;
this->_available = 0;
INIT_FINGERPRINT()
}
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED;
setServer(addr,port);
setClient(client);
setStream(stream);
this->_available = 0;
INIT_FINGERPRINT()
}
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED;
setServer(addr, port);
setCallback(callback);
setClient(client);
this->stream = NULL;
this->_available = 0;
INIT_FINGERPRINT()
}
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED;
setServer(addr,port);
setCallback(callback);
setClient(client);
setStream(stream);
this->_available = 0;
INIT_FINGERPRINT()
}

PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
this->_state = MQTT_DISCONNECTED;
setServer(ip, port);
setClient(client);
this->stream = NULL;
this->_available = 0;
INIT_FINGERPRINT()
}
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED;
setServer(ip,port);
setClient(client);
setStream(stream);
this->_available = 0;
INIT_FINGERPRINT()
}
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED;
setServer(ip, port);
setCallback(callback);
setClient(client);
this->stream = NULL;
this->_available = 0;
INIT_FINGERPRINT()
}
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED;
setServer(ip,port);
setCallback(callback);
setClient(client);
setStream(stream);
this->_available = 0;
INIT_FINGERPRINT()
}

PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
this->_state = MQTT_DISCONNECTED;
setServer(domain,port);
setClient(client);
this->stream = NULL;
this->_available = 0;
INIT_FINGERPRINT()
}
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED;
setServer(domain,port);
setClient(client);
setStream(stream);
this->_available = 0;
INIT_FINGERPRINT()
}
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED;
setServer(domain,port);
setCallback(callback);
setClient(client);
this->stream = NULL;
this->_available = 0;
INIT_FINGERPRINT()
}
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED;
setServer(domain,port);
setCallback(callback);
setClient(client);
setStream(stream);
this->_available = 0;
INIT_FINGERPRINT()
}

boolean PubSubClient::connect(const char *id) {
Expand All @@ -122,6 +166,29 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
} else {
result = _client->connect(this->ip, this->port);
}

#ifdef ESP8266
if (fingerprint != NULL) {
if (domain != NULL) {
// there's only one way to set fingerprint: using the WiFiClientSecure-based constructor, so this cast is safe
if (!static_cast<WiFiClientSecure*>(_client)->verify(fingerprint, domain)) {
_state = MQTT_TLS_BAD_SERVER_CREDENTIALS;
return false;
}
}
else {
char buffer[16]; // IPv4 only (which is what IPAddress supports anyway)

ip.toString().toCharArray(buffer, 16);

if (!static_cast<WiFiClientSecure*>(_client)->verify(fingerprint, buffer)) {
_state = MQTT_TLS_BAD_SERVER_CREDENTIALS;
return false;
}
}
}
#endif

if (result == 1) {
nextMsgId = 1;
// Leave room in the buffer for header and variable length field
Expand Down Expand Up @@ -175,7 +242,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass

lastInActivity = lastOutActivity = millis();

while (!_client->available()) {
while (!available()) {
unsigned long t = millis();
if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) {
_state = MQTT_CONNECTION_TIMEOUT;
Expand Down Expand Up @@ -205,16 +272,26 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
return true;
}

// return and cache the available number of bytes in the client;
// remember to reduce the available count when consuming the buffer
int PubSubClient::available() {
if (_available == 0) {
_available = _client->available();
}
return _available;
}

// reads a byte into result
boolean PubSubClient::readByte(uint8_t * result) {
uint32_t previousMillis = millis();
while(!_client->available()) {
while(!available()) {
uint32_t currentMillis = millis();
if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){
return false;
}
}
*result = _client->read();
_available -= 1;
return true;
}

Expand Down Expand Up @@ -281,62 +358,67 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {

boolean PubSubClient::loop() {
if (connected()) {
unsigned long t = millis();
if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
if (pingOutstanding) {
this->_state = MQTT_CONNECTION_TIMEOUT;
_client->stop();
return false;
} else {
buffer[0] = MQTTPINGREQ;
buffer[1] = 0;
_client->write(buffer,2);
lastOutActivity = t;
lastInActivity = t;
pingOutstanding = true;
do {
unsigned long t = millis();
if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
if (pingOutstanding) {
this->_state = MQTT_CONNECTION_TIMEOUT;
_client->stop();
return false;
} else {
buffer[0] = MQTTPINGREQ;
buffer[1] = 0;
_client->write(buffer,2);
lastOutActivity = t;
lastInActivity = t;
pingOutstanding = true;
}
}
}
if (_client->available()) {
uint8_t llen;
uint16_t len = readPacket(&llen);
uint16_t msgId = 0;
uint8_t *payload;
if (len > 0) {
lastInActivity = t;
uint8_t type = buffer[0]&0xF0;
if (type == MQTTPUBLISH) {
if (callback) {
uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */
memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
char *topic = (char*) buffer+llen+2;
// msgId only present for QOS>0
if ((buffer[0]&0x06) == MQTTQOS1) {
msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
payload = buffer+llen+3+tl+2;
callback(topic,payload,len-llen-3-tl-2);

buffer[0] = MQTTPUBACK;
buffer[1] = 2;
buffer[2] = (msgId >> 8);
buffer[3] = (msgId & 0xFF);
_client->write(buffer,4);
lastOutActivity = t;

} else {
payload = buffer+llen+3+tl;
callback(topic,payload,len-llen-3-tl);

if (available()) {
uint8_t llen;
uint16_t len = readPacket(&llen);
uint16_t msgId = 0;
uint8_t *payload;
if (len > 0) {
lastInActivity = t;
uint8_t type = buffer[0]&0xF0;
if (type == MQTTPUBLISH) {
if (callback) {
uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */
memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
char *topic = (char*) buffer+llen+2;
// msgId only present for QOS>0
if ((buffer[0]&0x06) == MQTTQOS1) {
msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
payload = buffer+llen+3+tl+2;
callback(topic,payload,len-llen-3-tl-2);

buffer[0] = MQTTPUBACK;
buffer[1] = 2;
buffer[2] = (msgId >> 8);
buffer[3] = (msgId & 0xFF);
_client->write(buffer,4);
lastOutActivity = t;

} else {
payload = buffer+llen+3+tl;
callback(topic,payload,len-llen-3-tl);
}
}
} else if (type == MQTTPINGREQ) {
buffer[0] = MQTTPINGRESP;
buffer[1] = 0;
_client->write(buffer,2);
} else if (type == MQTTPINGRESP) {
pingOutstanding = false;
}
} else if (type == MQTTPINGREQ) {
buffer[0] = MQTTPINGRESP;
buffer[1] = 0;
_client->write(buffer,2);
} else if (type == MQTTPINGRESP) {
pingOutstanding = false;
}
}
}
} while (_available > 0); // can't leave data in the buffer, or subsequent publish() calls
// may fail (axTLS is only half-duplex, so writes will fail, to
// avoid losing information)
return true;
}
return false;
Expand Down
Loading