diff --git a/ampel-firmware/src/lib/PubSubClient/src/PubSubClient.cpp b/ampel-firmware/src/lib/PubSubClient/src/PubSubClient.cpp index 2b48d2b6b8ff28f5be7ad46a06c2e4f3f389aeae..2619e58e8c0b32c2f2051223140c3d40459e9bc2 100644 --- a/ampel-firmware/src/lib/PubSubClient/src/PubSubClient.cpp +++ b/ampel-firmware/src/lib/PubSubClient/src/PubSubClient.cpp @@ -1,161 +1,161 @@ /* - PubSubClient.cpp - A simple client for MQTT. - Nick O'Leary - http://knolleary.net -*/ + PubSubClient.cpp - A simple client for MQTT. + Nick O'Leary + http://knolleary.net + */ #include "PubSubClient.h" #include "Arduino.h" PubSubClient::PubSubClient() { - this->_state = MQTT_DISCONNECTED; - this->_client = NULL; - this->stream = NULL; - setCallback(NULL); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); - setKeepAlive(MQTT_KEEPALIVE); - setSocketTimeout(MQTT_SOCKET_TIMEOUT); -} - -PubSubClient::PubSubClient(Client& client) { - this->_state = MQTT_DISCONNECTED; - setClient(client); - this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); - setKeepAlive(MQTT_KEEPALIVE); - setSocketTimeout(MQTT_SOCKET_TIMEOUT); -} - -PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { - this->_state = MQTT_DISCONNECTED; - setServer(addr, port); - setClient(client); - this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); - setKeepAlive(MQTT_KEEPALIVE); - setSocketTimeout(MQTT_SOCKET_TIMEOUT); -} -PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { - this->_state = MQTT_DISCONNECTED; - setServer(addr,port); - setClient(client); - setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); - setKeepAlive(MQTT_KEEPALIVE); - setSocketTimeout(MQTT_SOCKET_TIMEOUT); -} -PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { - this->_state = MQTT_DISCONNECTED; - setServer(addr, port); - setCallback(callback); - setClient(client); - this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); - setKeepAlive(MQTT_KEEPALIVE); - setSocketTimeout(MQTT_SOCKET_TIMEOUT); -} -PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { - this->_state = MQTT_DISCONNECTED; - setServer(addr,port); - setCallback(callback); - setClient(client); - setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); - setKeepAlive(MQTT_KEEPALIVE); - setSocketTimeout(MQTT_SOCKET_TIMEOUT); -} - -PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { - this->_state = MQTT_DISCONNECTED; - setServer(ip, port); - setClient(client); - this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); - setKeepAlive(MQTT_KEEPALIVE); - setSocketTimeout(MQTT_SOCKET_TIMEOUT); -} -PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { - this->_state = MQTT_DISCONNECTED; - setServer(ip,port); - setClient(client); - setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); - setKeepAlive(MQTT_KEEPALIVE); - setSocketTimeout(MQTT_SOCKET_TIMEOUT); -} -PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { - this->_state = MQTT_DISCONNECTED; - setServer(ip, port); - setCallback(callback); - setClient(client); - this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); - setKeepAlive(MQTT_KEEPALIVE); - setSocketTimeout(MQTT_SOCKET_TIMEOUT); -} -PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { - this->_state = MQTT_DISCONNECTED; - setServer(ip,port); - setCallback(callback); - setClient(client); - setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); - setKeepAlive(MQTT_KEEPALIVE); - setSocketTimeout(MQTT_SOCKET_TIMEOUT); -} - -PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { - this->_state = MQTT_DISCONNECTED; - setServer(domain,port); - setClient(client); - this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); - setKeepAlive(MQTT_KEEPALIVE); - setSocketTimeout(MQTT_SOCKET_TIMEOUT); -} -PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { - this->_state = MQTT_DISCONNECTED; - setServer(domain,port); - setClient(client); - setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); - setKeepAlive(MQTT_KEEPALIVE); - setSocketTimeout(MQTT_SOCKET_TIMEOUT); -} -PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { - this->_state = MQTT_DISCONNECTED; - setServer(domain,port); - setCallback(callback); - setClient(client); - this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); - setKeepAlive(MQTT_KEEPALIVE); - setSocketTimeout(MQTT_SOCKET_TIMEOUT); -} -PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { - this->_state = MQTT_DISCONNECTED; - setServer(domain,port); - setCallback(callback); - setClient(client); - setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); - setKeepAlive(MQTT_KEEPALIVE); - setSocketTimeout(MQTT_SOCKET_TIMEOUT); + this->_state = MQTT_DISCONNECTED; + this->_client = NULL; + this->stream = NULL; + setCallback(NULL); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} + +PubSubClient::PubSubClient(Client &client) { + this->_state = MQTT_DISCONNECTED; + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} + +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client &client) { + this->_state = MQTT_DISCONNECTED; + setServer(addr, port); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client &client, Stream &stream) { + this->_state = MQTT_DISCONNECTED; + setServer(addr, port); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client &client) { + this->_state = MQTT_DISCONNECTED; + setServer(addr, port); + setCallback(callback); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client &client, Stream &stream) { + this->_state = MQTT_DISCONNECTED; + setServer(addr, port); + setCallback(callback); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} + +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client &client) { + this->_state = MQTT_DISCONNECTED; + setServer(ip, port); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client &client, Stream &stream) { + this->_state = MQTT_DISCONNECTED; + setServer(ip, port); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client &client) { + this->_state = MQTT_DISCONNECTED; + setServer(ip, port); + setCallback(callback); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client &client, Stream &stream) { + this->_state = MQTT_DISCONNECTED; + setServer(ip, port); + setCallback(callback); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} + +PubSubClient::PubSubClient(const char *domain, uint16_t port, Client &client) { + this->_state = MQTT_DISCONNECTED; + setServer(domain, port); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(const char *domain, uint16_t port, Client &client, Stream &stream) { + this->_state = MQTT_DISCONNECTED; + setServer(domain, port); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(const char *domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client &client) { + this->_state = MQTT_DISCONNECTED; + setServer(domain, port); + setCallback(callback); + setClient(client); + this->stream = NULL; + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); +} +PubSubClient::PubSubClient(const char *domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client &client, Stream &stream) { + this->_state = MQTT_DISCONNECTED; + setServer(domain, port); + setCallback(callback); + setClient(client); + setStream(stream); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); + setSocketTimeout(MQTT_SOCKET_TIMEOUT); } PubSubClient::~PubSubClient() { @@ -163,424 +163,431 @@ PubSubClient::~PubSubClient() { } boolean PubSubClient::connect(const char *id) { - return connect(id,NULL,NULL,0,0,0,0,1); + return connect(id, NULL, NULL, 0, 0, 0, 0, 1); } boolean PubSubClient::connect(const char *id, const char *user, const char *pass) { - return connect(id,user,pass,0,0,0,0,1); + return connect(id, user, pass, 0, 0, 0, 0, 1); } -boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { - return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1); +boolean PubSubClient::connect(const char *id, const char *willTopic, uint8_t willQos, boolean willRetain, + const char *willMessage) { + return connect(id, NULL, NULL, willTopic, willQos, willRetain, willMessage, 1); } -boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { - return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1); +boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char *willTopic, + uint8_t willQos, boolean willRetain, const char *willMessage) { + return connect(id, user, pass, willTopic, willQos, willRetain, willMessage, 1); } -boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession) { - if (!connected()) { - int result = 0; +boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char *willTopic, + uint8_t willQos, boolean willRetain, const char *willMessage, boolean cleanSession) { + if (!connected()) { + int result = 0; + if (_client->connected()) { + result = 1; + } else { + if (domain != NULL) { + result = _client->connect(this->domain, this->port); + } else { + result = _client->connect(this->ip, this->port); + } + } - if(_client->connected()) { - result = 1; - } else { - if (domain != NULL) { - result = _client->connect(this->domain, this->port); - } else { - result = _client->connect(this->ip, this->port); - } - } - - if (result == 1) { - nextMsgId = 1; - // Leave room in the buffer for header and variable length field - uint16_t length = MQTT_MAX_HEADER_SIZE; - unsigned int j; + if (result == 1) { + nextMsgId = 1; + // Leave room in the buffer for header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + unsigned int j; #if MQTT_VERSION == MQTT_VERSION_3_1 uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; #define MQTT_HEADER_VERSION_LENGTH 9 #elif MQTT_VERSION == MQTT_VERSION_3_1_1 - uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION}; + uint8_t d[7] = { 0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION }; #define MQTT_HEADER_VERSION_LENGTH 7 #endif - for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) { - this->buffer[length++] = d[j]; - } - - uint8_t v; - if (willTopic) { - v = 0x04|(willQos<<3)|(willRetain<<5); - } else { - v = 0x00; - } - if (cleanSession) { - v = v|0x02; - } - - if(user != NULL) { - v = v|0x80; - - if(pass != NULL) { - v = v|(0x80>>1); - } - } - this->buffer[length++] = v; - - this->buffer[length++] = ((this->keepAlive) >> 8); - this->buffer[length++] = ((this->keepAlive) & 0xFF); - - CHECK_STRING_LENGTH(length,id) - length = writeString(id,this->buffer,length); - if (willTopic) { - CHECK_STRING_LENGTH(length,willTopic) - length = writeString(willTopic,this->buffer,length); - CHECK_STRING_LENGTH(length,willMessage) - length = writeString(willMessage,this->buffer,length); - } - - if(user != NULL) { - CHECK_STRING_LENGTH(length,user) - length = writeString(user,this->buffer,length); - if(pass != NULL) { - CHECK_STRING_LENGTH(length,pass) - length = writeString(pass,this->buffer,length); - } - } + for (j = 0; j < MQTT_HEADER_VERSION_LENGTH; j++) { + this->buffer[length++] = d[j]; + } + + uint8_t v; + if (willTopic) { + v = 0x04 | (willQos << 3) | (willRetain << 5); + } else { + v = 0x00; + } + if (cleanSession) { + v = v | 0x02; + } + + if (user != NULL) { + v = v | 0x80; + + if (pass != NULL) { + v = v | (0x80 >> 1); + } + } + this->buffer[length++] = v; + + this->buffer[length++] = ((this->keepAlive) >> 8); + this->buffer[length++] = ((this->keepAlive) & 0xFF); + + CHECK_STRING_LENGTH(length, id) + length = writeString(id, this->buffer, length); + if (willTopic) { + CHECK_STRING_LENGTH(length, willTopic) + length = writeString(willTopic, this->buffer, length); + CHECK_STRING_LENGTH(length, willMessage) + length = writeString(willMessage, this->buffer, length); + } + + if (user != NULL) { + CHECK_STRING_LENGTH(length, user) + length = writeString(user, this->buffer, length); + if (pass != NULL) { + CHECK_STRING_LENGTH(length, pass) + length = writeString(pass, this->buffer, length); + } + } - write(MQTTCONNECT,this->buffer,length-MQTT_MAX_HEADER_SIZE); + write(MQTTCONNECT, this->buffer, length - MQTT_MAX_HEADER_SIZE); - lastInActivity = lastOutActivity = millis(); + lastInActivity = lastOutActivity = millis(); - while (!_client->available()) { - unsigned long t = millis(); - if (t-lastInActivity >= ((int32_t) this->socketTimeout*1000UL)) { - _state = MQTT_CONNECTION_TIMEOUT; - _client->stop(); - return false; - } - } - uint8_t llen; - uint32_t len = readPacket(&llen); - - if (len == 4) { - if (buffer[3] == 0) { - lastInActivity = millis(); - pingOutstanding = false; - _state = MQTT_CONNECTED; - return true; - } else { - _state = buffer[3]; - } - } - _client->stop(); + while (!_client->available()) { + unsigned long t = millis(); + if (t - lastInActivity >= ((int32_t) this->socketTimeout * 1000UL)) { + _state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + return false; + } + } + uint8_t llen; + uint32_t len = readPacket(&llen); + + if (len == 4) { + if (buffer[3] == 0) { + lastInActivity = millis(); + pingOutstanding = false; + _state = MQTT_CONNECTED; + return true; } else { - _state = MQTT_CONNECT_FAILED; + _state = buffer[3]; } - return false; + } + _client->stop(); + } else { + _state = MQTT_CONNECT_FAILED; } - return true; + return false; + } + return true; } // reads a byte into result -boolean PubSubClient::readByte(uint8_t * result) { - uint32_t previousMillis = millis(); - while(!_client->available()) { - yield(); - uint32_t currentMillis = millis(); - if(currentMillis - previousMillis >= ((int32_t) this->socketTimeout * 1000)){ - return false; - } - } - *result = _client->read(); - return true; +boolean PubSubClient::readByte(uint8_t *result) { + uint32_t previousMillis = millis(); + while (!_client->available()) { + yield(); + uint32_t currentMillis = millis(); + if (currentMillis - previousMillis >= ((int32_t) this->socketTimeout * 1000)) { + return false; + } + } + *result = _client->read(); + return true; } // reads a byte into result[*index] and increments index -boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){ +boolean PubSubClient::readByte(uint8_t *result, uint16_t *index) { uint16_t current_index = *index; - uint8_t * write_address = &(result[current_index]); - if(readByte(write_address)){ + uint8_t *write_address = &(result[current_index]); + if (readByte(write_address)) { *index = current_index + 1; return true; } return false; } -uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { - uint16_t len = 0; - if(!readByte(this->buffer, &len)) return 0; - bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH; - uint32_t multiplier = 1; - uint32_t length = 0; - uint8_t digit = 0; - uint16_t skip = 0; - uint32_t start = 0; - - do { - if (len == 5) { - // Invalid remaining length encoding - kill the connection - _state = MQTT_DISCONNECTED; - _client->stop(); - return 0; - } - if(!readByte(&digit)) return 0; - this->buffer[len++] = digit; - length += (digit & 127) * multiplier; - multiplier <<=7; //multiplier *= 128 - } while ((digit & 128) != 0); - *lengthLength = len-1; - - if (isPublish) { - // Read in topic length to calculate bytes to skip over for Stream writing - if(!readByte(this->buffer, &len)) return 0; - if(!readByte(this->buffer, &len)) return 0; - skip = (this->buffer[*lengthLength+1]<<8)+this->buffer[*lengthLength+2]; - start = 2; - if (this->buffer[0]&MQTTQOS1) { - // skip message id - skip += 2; - } +uint32_t PubSubClient::readPacket(uint8_t *lengthLength) { + uint16_t len = 0; + if (!readByte(this->buffer, &len)) + return 0; + bool isPublish = (this->buffer[0] & 0xF0) == MQTTPUBLISH; + uint32_t multiplier = 1; + uint32_t length = 0; + uint8_t digit = 0; + uint16_t skip = 0; + uint32_t start = 0; + + do { + if (len == 5) { + // Invalid remaining length encoding - kill the connection + _state = MQTT_DISCONNECTED; + _client->stop(); + return 0; } - uint32_t idx = len; - - for (uint32_t i = start;i<length;i++) { - if(!readByte(&digit)) return 0; - if (this->stream) { - if (isPublish && idx-*lengthLength-2>skip) { - this->stream->write(digit); - } - } - - if (len < this->bufferSize) { - this->buffer[len] = digit; - len++; - } - idx++; + if (!readByte(&digit)) + return 0; + this->buffer[len++] = digit; + length += (digit & 127) * multiplier; + multiplier <<= 7; //multiplier *= 128 + } while ((digit & 128) != 0); + *lengthLength = len - 1; + + if (isPublish) { + // Read in topic length to calculate bytes to skip over for Stream writing + if (!readByte(this->buffer, &len)) + return 0; + if (!readByte(this->buffer, &len)) + return 0; + skip = (this->buffer[*lengthLength + 1] << 8) + this->buffer[*lengthLength + 2]; + start = 2; + if (this->buffer[0] & MQTTQOS1) { + // skip message id + skip += 2; + } + } + uint32_t idx = len; + + for (uint32_t i = start; i < length; i++) { + if (!readByte(&digit)) + return 0; + if (this->stream) { + if (isPublish && idx - *lengthLength - 2 > skip) { + this->stream->write(digit); + } } - if (!this->stream && idx > this->bufferSize) { - len = 0; // This will cause the packet to be ignored. + if (len < this->bufferSize) { + this->buffer[len] = digit; + len++; } - return len; + idx++; + } + + if (!this->stream && idx > this->bufferSize) { + len = 0; // This will cause the packet to be ignored. + } + return len; } boolean PubSubClient::loop() { - if (connected()) { - unsigned long t = millis(); - if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) { - if (pingOutstanding) { - this->_state = MQTT_CONNECTION_TIMEOUT; - _client->stop(); - return false; + if (connected()) { + unsigned long t = millis(); + if ((t - lastInActivity > this->keepAlive * 1000UL) || (t - lastOutActivity > this->keepAlive * 1000UL)) { + if (pingOutstanding) { + this->_state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + return false; + } else { + this->buffer[0] = MQTTPINGREQ; + this->buffer[1] = 0; + _client->write(this->buffer, 2); + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; + } + } + if (_client->available()) { + uint8_t llen; + uint16_t len = readPacket(&llen); + uint16_t msgId = 0; + uint8_t *payload; + if (len > 0) { + lastInActivity = t; + uint8_t type = this->buffer[0] & 0xF0; + if (type == MQTTPUBLISH) { + if (callback) { + uint16_t tl = (this->buffer[llen + 1] << 8) + this->buffer[llen + 2]; /* topic length in bytes */ + memmove(this->buffer + llen + 2, this->buffer + llen + 3, tl); /* move topic inside buffer 1 byte to front */ + this->buffer[llen + 2 + tl] = 0; /* end the topic as a 'C' string with \x00 */ + char *topic = (char*) this->buffer + llen + 2; + // msgId only present for QOS>0 + if ((this->buffer[0] & 0x06) == MQTTQOS1) { + msgId = (this->buffer[llen + 3 + tl] << 8) + this->buffer[llen + 3 + tl + 1]; + payload = this->buffer + llen + 3 + tl + 2; + callback(topic, payload, len - llen - 3 - tl - 2); + + this->buffer[0] = MQTTPUBACK; + this->buffer[1] = 2; + this->buffer[2] = (msgId >> 8); + this->buffer[3] = (msgId & 0xFF); + _client->write(this->buffer, 4); + lastOutActivity = t; + } else { - this->buffer[0] = MQTTPINGREQ; - this->buffer[1] = 0; - _client->write(this->buffer,2); - lastOutActivity = t; - lastInActivity = t; - pingOutstanding = true; - } - } - if (_client->available()) { - uint8_t llen; - uint16_t len = readPacket(&llen); - uint16_t msgId = 0; - uint8_t *payload; - if (len > 0) { - lastInActivity = t; - uint8_t type = this->buffer[0]&0xF0; - if (type == MQTTPUBLISH) { - if (callback) { - uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2]; /* topic length in bytes */ - memmove(this->buffer+llen+2,this->buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ - this->buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ - char *topic = (char*) this->buffer+llen+2; - // msgId only present for QOS>0 - if ((this->buffer[0]&0x06) == MQTTQOS1) { - msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1]; - payload = this->buffer+llen+3+tl+2; - callback(topic,payload,len-llen-3-tl-2); - - this->buffer[0] = MQTTPUBACK; - this->buffer[1] = 2; - this->buffer[2] = (msgId >> 8); - this->buffer[3] = (msgId & 0xFF); - _client->write(this->buffer,4); - lastOutActivity = t; - - } else { - payload = this->buffer+llen+3+tl; - callback(topic,payload,len-llen-3-tl); - } - } - } else if (type == MQTTPINGREQ) { - this->buffer[0] = MQTTPINGRESP; - this->buffer[1] = 0; - _client->write(this->buffer,2); - } else if (type == MQTTPINGRESP) { - pingOutstanding = false; - } - } else if (!connected()) { - // readPacket has closed the connection - return false; + payload = this->buffer + llen + 3 + tl; + callback(topic, payload, len - llen - 3 - tl); } + } + } else if (type == MQTTPINGREQ) { + this->buffer[0] = MQTTPINGRESP; + this->buffer[1] = 0; + _client->write(this->buffer, 2); + } else if (type == MQTTPINGRESP) { + pingOutstanding = false; } - return true; + } else if (!connected()) { + // readPacket has closed the connection + return false; + } } - return false; + return true; + } + return false; } -boolean PubSubClient::publish(const char* topic, const char* payload) { - return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,false); +boolean PubSubClient::publish(const char *topic, const char *payload) { + return publish(topic, (const uint8_t*) payload, payload ? strnlen(payload, this->bufferSize) : 0, false); } -boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) { - return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,retained); +boolean PubSubClient::publish(const char *topic, const char *payload, boolean retained) { + return publish(topic, (const uint8_t*) payload, payload ? strnlen(payload, this->bufferSize) : 0, retained); } -boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { - return publish(topic, payload, plength, false); +boolean PubSubClient::publish(const char *topic, const uint8_t *payload, unsigned int plength) { + return publish(topic, payload, plength, false); } -boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { - if (connected()) { - if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2+strnlen(topic, this->bufferSize) + plength) { - // Too long - return false; - } - // Leave room in the buffer for header and variable length field - uint16_t length = MQTT_MAX_HEADER_SIZE; - length = writeString(topic,this->buffer,length); - - // Add payload - uint16_t i; - for (i=0;i<plength;i++) { - this->buffer[length++] = payload[i]; - } +boolean PubSubClient::publish(const char *topic, const uint8_t *payload, unsigned int plength, boolean retained) { + if (connected()) { + if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2 + strnlen(topic, this->bufferSize) + plength) { + // Too long + return false; + } + // Leave room in the buffer for header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + length = writeString(topic, this->buffer, length); + + // Add payload + uint16_t i; + for (i = 0; i < plength; i++) { + this->buffer[length++] = payload[i]; + } - // Write the header - uint8_t header = MQTTPUBLISH; - if (retained) { - header |= 1; - } - return write(header,this->buffer,length-MQTT_MAX_HEADER_SIZE); + // Write the header + uint8_t header = MQTTPUBLISH; + if (retained) { + header |= 1; } - return false; + return write(header, this->buffer, length - MQTT_MAX_HEADER_SIZE); + } + return false; } -boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) { - return publish_P(topic, (const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0, retained); +boolean PubSubClient::publish_P(const char *topic, const char *payload, boolean retained) { + return publish_P(topic, (const uint8_t*) payload, payload ? strnlen(payload, this->bufferSize) : 0, retained); } -boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { - uint8_t llen = 0; - uint8_t digit; - unsigned int rc = 0; - uint16_t tlen; - unsigned int pos = 0; - unsigned int i; - uint8_t header; - unsigned int len; - int expectedLength; +boolean PubSubClient::publish_P(const char *topic, const uint8_t *payload, unsigned int plength, boolean retained) { + uint8_t llen = 0; + uint8_t digit; + unsigned int rc = 0; + uint16_t tlen; + unsigned int pos = 0; + unsigned int i; + uint8_t header; + unsigned int len; + uint32_t expectedLength; - if (!connected()) { - return false; - } + if (!connected()) { + return false; + } - tlen = strnlen(topic, this->bufferSize); + tlen = strnlen(topic, this->bufferSize); - header = MQTTPUBLISH; - if (retained) { - header |= 1; + header = MQTTPUBLISH; + if (retained) { + header |= 1; + } + this->buffer[pos++] = header; + len = plength + 2 + tlen; + do { + digit = len & 127; //digit = len %128 + len >>= 7; //len = len / 128 + if (len > 0) { + digit |= 0x80; } - this->buffer[pos++] = header; - len = plength + 2 + tlen; - do { - digit = len & 127; //digit = len %128 - len >>= 7; //len = len / 128 - if (len > 0) { - digit |= 0x80; - } - this->buffer[pos++] = digit; - llen++; - } while(len>0); + this->buffer[pos++] = digit; + llen++; + } while (len > 0); - pos = writeString(topic,this->buffer,pos); + pos = writeString(topic, this->buffer, pos); - rc += _client->write(this->buffer,pos); + rc += _client->write(this->buffer, pos); - for (i=0;i<plength;i++) { - rc += _client->write((char)pgm_read_byte_near(payload + i)); - } + for (i = 0; i < plength; i++) { + rc += _client->write((char) pgm_read_byte_near(payload + i)); + } - lastOutActivity = millis(); + lastOutActivity = millis(); - expectedLength = 1 + llen + 2 + tlen + plength; + expectedLength = 1 + llen + 2 + tlen + plength; - return (rc == expectedLength); + return (rc == expectedLength); } -boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) { - if (connected()) { - // Send the header and variable length field - uint16_t length = MQTT_MAX_HEADER_SIZE; - length = writeString(topic,this->buffer,length); - uint8_t header = MQTTPUBLISH; - if (retained) { - header |= 1; - } - size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE); - uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); - lastOutActivity = millis(); - return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen))); +boolean PubSubClient::beginPublish(const char *topic, unsigned int plength, boolean retained) { + if (connected()) { + // Send the header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + length = writeString(topic, this->buffer, length); + uint8_t header = MQTTPUBLISH; + if (retained) { + header |= 1; } - return false; + size_t hlen = buildHeader(header, this->buffer, plength + length - MQTT_MAX_HEADER_SIZE); + uint16_t rc = _client->write(this->buffer + (MQTT_MAX_HEADER_SIZE - hlen), length - (MQTT_MAX_HEADER_SIZE - hlen)); + lastOutActivity = millis(); + return (rc == (length - (MQTT_MAX_HEADER_SIZE - hlen))); + } + return false; } int PubSubClient::endPublish() { - return 1; + return 1; } size_t PubSubClient::write(uint8_t data) { - lastOutActivity = millis(); - return _client->write(data); + lastOutActivity = millis(); + return _client->write(data); } size_t PubSubClient::write(const uint8_t *buffer, size_t size) { - lastOutActivity = millis(); - return _client->write(buffer,size); -} - -size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) { - uint8_t lenBuf[4]; - uint8_t llen = 0; - uint8_t digit; - uint8_t pos = 0; - uint16_t len = length; - do { - - digit = len & 127; //digit = len %128 - len >>= 7; //len = len / 128 - if (len > 0) { - digit |= 0x80; - } - lenBuf[pos++] = digit; - llen++; - } while(len>0); - - buf[4-llen] = header; - for (int i=0;i<llen;i++) { - buf[MQTT_MAX_HEADER_SIZE-llen+i] = lenBuf[i]; + lastOutActivity = millis(); + return _client->write(buffer, size); +} + +size_t PubSubClient::buildHeader(uint8_t header, uint8_t *buf, uint16_t length) { + uint8_t lenBuf[4]; + uint8_t llen = 0; + uint8_t digit; + uint8_t pos = 0; + uint16_t len = length; + do { + + digit = len & 127; //digit = len %128 + len >>= 7; //len = len / 128 + if (len > 0) { + digit |= 0x80; } - return llen+1; // Full header size is variable length bit plus the 1-byte fixed header + lenBuf[pos++] = digit; + llen++; + } while (len > 0); + + buf[4 - llen] = header; + for (int i = 0; i < llen; i++) { + buf[MQTT_MAX_HEADER_SIZE - llen + i] = lenBuf[i]; + } + return llen + 1; // Full header size is variable length bit plus the 1-byte fixed header } -boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { - uint16_t rc; - uint8_t hlen = buildHeader(header, buf, length); +boolean PubSubClient::write(uint8_t header, uint8_t *buf, uint16_t length) { + uint16_t rc; + uint8_t hlen = buildHeader(header, buf, length); #ifdef MQTT_MAX_TRANSFER_SIZE uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen); @@ -596,174 +603,173 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { } return result; #else - rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen); - lastOutActivity = millis(); - return (rc == hlen+length); + rc = _client->write(buf + (MQTT_MAX_HEADER_SIZE - hlen), length + hlen); + lastOutActivity = millis(); + return (rc == hlen + length); #endif } -boolean PubSubClient::subscribe(const char* topic) { - return subscribe(topic, 0); +boolean PubSubClient::subscribe(const char *topic) { + return subscribe(topic, 0); } -boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { - size_t topicLength = strnlen(topic, this->bufferSize); - if (topic == 0) { - return false; - } - if (qos > 1) { - return false; - } - if (this->bufferSize < 9 + topicLength) { - // Too long - return false; - } - if (connected()) { - // Leave room in the buffer for header and variable length field - uint16_t length = MQTT_MAX_HEADER_SIZE; - nextMsgId++; - if (nextMsgId == 0) { - nextMsgId = 1; - } - this->buffer[length++] = (nextMsgId >> 8); - this->buffer[length++] = (nextMsgId & 0xFF); - length = writeString((char*)topic, this->buffer,length); - this->buffer[length++] = qos; - return write(MQTTSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE); - } +boolean PubSubClient::subscribe(const char *topic, uint8_t qos) { + size_t topicLength = strnlen(topic, this->bufferSize); + if (topic == 0) { + return false; + } + if (qos > 1) { return false; + } + if (this->bufferSize < 9 + topicLength) { + // Too long + return false; + } + if (connected()) { + // Leave room in the buffer for header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + this->buffer[length++] = (nextMsgId >> 8); + this->buffer[length++] = (nextMsgId & 0xFF); + length = writeString((char*) topic, this->buffer, length); + this->buffer[length++] = qos; + return write(MQTTSUBSCRIBE | MQTTQOS1, this->buffer, length - MQTT_MAX_HEADER_SIZE); + } + return false; } -boolean PubSubClient::unsubscribe(const char* topic) { - size_t topicLength = strnlen(topic, this->bufferSize); - if (topic == 0) { - return false; - } - if (this->bufferSize < 9 + topicLength) { - // Too long - return false; - } - if (connected()) { - uint16_t length = MQTT_MAX_HEADER_SIZE; - nextMsgId++; - if (nextMsgId == 0) { - nextMsgId = 1; - } - this->buffer[length++] = (nextMsgId >> 8); - this->buffer[length++] = (nextMsgId & 0xFF); - length = writeString(topic, this->buffer,length); - return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE); - } +boolean PubSubClient::unsubscribe(const char *topic) { + size_t topicLength = strnlen(topic, this->bufferSize); + if (topic == 0) { return false; + } + if (this->bufferSize < 9 + topicLength) { + // Too long + return false; + } + if (connected()) { + uint16_t length = MQTT_MAX_HEADER_SIZE; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + this->buffer[length++] = (nextMsgId >> 8); + this->buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, this->buffer, length); + return write(MQTTUNSUBSCRIBE | MQTTQOS1, this->buffer, length - MQTT_MAX_HEADER_SIZE); + } + return false; } void PubSubClient::disconnect() { - this->buffer[0] = MQTTDISCONNECT; - this->buffer[1] = 0; - _client->write(this->buffer,2); - _state = MQTT_DISCONNECTED; - _client->flush(); - _client->stop(); - lastInActivity = lastOutActivity = millis(); -} - -uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) { - const char* idp = string; - uint16_t i = 0; - pos += 2; - while (*idp) { - buf[pos++] = *idp++; - i++; - } - buf[pos-i-2] = (i >> 8); - buf[pos-i-1] = (i & 0xFF); - return pos; + this->buffer[0] = MQTTDISCONNECT; + this->buffer[1] = 0; + _client->write(this->buffer, 2); + _state = MQTT_DISCONNECTED; + _client->flush(); + _client->stop(); + lastInActivity = lastOutActivity = millis(); +} + +uint16_t PubSubClient::writeString(const char *string, uint8_t *buf, uint16_t pos) { + const char *idp = string; + uint16_t i = 0; + pos += 2; + while (*idp) { + buf[pos++] = *idp++; + i++; + } + buf[pos - i - 2] = (i >> 8); + buf[pos - i - 1] = (i & 0xFF); + return pos; } - boolean PubSubClient::connected() { - boolean rc; - if (_client == NULL ) { - rc = false; + boolean rc; + if (_client == NULL) { + rc = false; + } else { + rc = (int) _client->connected(); + if (!rc) { + if (this->_state == MQTT_CONNECTED) { + this->_state = MQTT_CONNECTION_LOST; + _client->flush(); + _client->stop(); + } } else { - rc = (int)_client->connected(); - if (!rc) { - if (this->_state == MQTT_CONNECTED) { - this->_state = MQTT_CONNECTION_LOST; - _client->flush(); - _client->stop(); - } - } else { - return this->_state == MQTT_CONNECTED; - } + return this->_state == MQTT_CONNECTED; } - return rc; + } + return rc; } -PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) { - IPAddress addr(ip[0],ip[1],ip[2],ip[3]); - return setServer(addr,port); +PubSubClient& PubSubClient::setServer(uint8_t *ip, uint16_t port) { + IPAddress addr(ip[0], ip[1], ip[2], ip[3]); + return setServer(addr, port); } PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) { - this->ip = ip; - this->port = port; - this->domain = NULL; - return *this; + this->ip = ip; + this->port = port; + this->domain = NULL; + return *this; } -PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) { - this->domain = domain; - this->port = port; - return *this; +PubSubClient& PubSubClient::setServer(const char *domain, uint16_t port) { + this->domain = domain; + this->port = port; + return *this; } PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) { - this->callback = callback; - return *this; + this->callback = callback; + return *this; } -PubSubClient& PubSubClient::setClient(Client& client){ - this->_client = &client; - return *this; +PubSubClient& PubSubClient::setClient(Client &client) { + this->_client = &client; + return *this; } -PubSubClient& PubSubClient::setStream(Stream& stream){ - this->stream = &stream; - return *this; +PubSubClient& PubSubClient::setStream(Stream &stream) { + this->stream = &stream; + return *this; } int PubSubClient::state() { - return this->_state; + return this->_state; } boolean PubSubClient::setBufferSize(uint16_t size) { - if (size == 0) { - // Cannot set it back to 0 - return false; - } - if (this->bufferSize == 0) { - this->buffer = (uint8_t*)malloc(size); + if (size == 0) { + // Cannot set it back to 0 + return false; + } + if (this->bufferSize == 0) { + this->buffer = (uint8_t*) malloc(size); + } else { + uint8_t *newBuffer = (uint8_t*) realloc(this->buffer, size); + if (newBuffer != NULL) { + this->buffer = newBuffer; } else { - uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size); - if (newBuffer != NULL) { - this->buffer = newBuffer; - } else { - return false; - } + return false; } - this->bufferSize = size; - return (this->buffer != NULL); + } + this->bufferSize = size; + return (this->buffer != NULL); } uint16_t PubSubClient::getBufferSize() { - return this->bufferSize; + return this->bufferSize; } PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) { - this->keepAlive = keepAlive; - return *this; + this->keepAlive = keepAlive; + return *this; } PubSubClient& PubSubClient::setSocketTimeout(uint16_t timeout) { - this->socketTimeout = timeout; - return *this; + this->socketTimeout = timeout; + return *this; }