mqtt.cpp 7.61 KiB
#include "mqtt.h"
#include "web_config.h"
#include "led_effects.h"
#include "sensor_console.h"
#include "wifi_util.h"
#include "ntp.h"
#include "src/lib/PubSubClient/src/PubSubClient.h"
#if defined(ESP8266)
#  include <ESP8266WiFi.h>
#elif defined(ESP32)
#  include <WiFi.h>
#endif
namespace config {
  // Values should be defined in config.h or over webconfig
  //INFO: Listen to every CO2 sensor which is connected to the server:
  //  mosquitto_sub -h MQTT_SERVER -t 'CO2sensors/#' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -v
  const unsigned long wait_after_fail = 900; // [s] Wait 15 minutes after an MQTT connection fail, before trying again.
#if defined(ESP32)
#  include <WiFiClientSecure.h>
#endif
WiFiClient *espClient;
PubSubClient mqttClient;
namespace mqtt {
  unsigned long last_sent_at = 0;
  unsigned long last_failed_at = 0;
  bool connected = false;
  char publish_topic[21]; // e.g. "CO2sensors/ESPxxxxxx\0"
  const char *json_sensor_format;
  char last_successful_publish[23] = "";
  void initialize(const char *sensorId) {
    json_sensor_format = PSTR("{\"time\":\"%s\", \"co2\":%d, \"temp\":%.1f, \"rh\":%.1f}");
    snprintf(publish_topic, sizeof(publish_topic), "CO2sensors/%s", sensorId);
    if (config::mqtt_encryption) {
      // The sensor doesn't check the fingerprint of the MQTT broker, because otherwise this fingerprint should be updated
      // on the sensor every 3 months. The connection can still be encrypted, though:
      WiFiClientSecure *secureClient = new WiFiClientSecure();
      secureClient->setInsecure();
      espClient = secureClient;
    } else {
      espClient = new WiFiClient();
    mqttClient.setClient(*espClient);
    mqttClient.setServer(config::mqtt_server, config::mqtt_port);
    sensor_console::defineIntCommand("mqtt", setMQTTinterval, F("60 (Sets MQTT sending interval, in s)"));
    sensor_console::defineCommand("send_local_ip", sendInfoAboutLocalNetwork,
        F("(Sends local IP and SSID via MQTT. Can be useful to find sensor)"));
  void publish(const char *timestamp, int16_t co2, float temperature, float humidity) {
    if (wifi::connected() && mqttClient.connected()) {
      led_effects::onBoardLEDOn();
      Serial.print(F("MQTT - Publishing message ... "));
      char payload[75]; // Should be enough for json...
      snprintf(payload, sizeof(payload), json_sensor_format, timestamp, co2, temperature, humidity);
      // Topic is the same as clientID. e.g. 'CO2sensors/ESP3d03da'
      if (mqttClient.publish(publish_topic, payload)) {
Serial.println(F("OK")); ntp::getLocalTime(last_successful_publish); } else { Serial.println(F("Failed.")); } led_effects::onBoardLEDOff(); } } /** * Allows sensor to be controlled by commands over MQTT * * mosquitto_pub -h MQTT_SERVER -t 'CO2sensors/SENSOR_ID/control' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -m "reset" * mosquitto_pub -h MQTT_SERVER -t 'CO2sensors/SENSOR_ID/control' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -m "timer 30" * mosquitto_pub -h MQTT_SERVER -t 'CO2sensors/SENSOR_ID/control' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -m "mqtt 900" * mosquitto_pub -h MQTT_SERVER -t 'CO2sensors/SENSOR_ID/control' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -m "calibrate 700" */ void controlSensorCallback(char *sub_topic, byte *message, unsigned int length) { if (length == 0) { return; } led_effects::onBoardLEDOn(); Serial.print(F("Message arrived on topic: ")); Serial.println(sub_topic); char command[length + 1]; for (unsigned int i = 0; i < length; i++) { command[i] = message[i]; } command[length] = 0; sensor_console::execute(command); led_effects::onBoardLEDOff(); } void reconnect() { if (last_failed_at > 0 && (seconds() - last_failed_at < config::wait_after_fail)) { // It failed less than wait_after_fail ago. Not even trying. return; } if (!wifi::connected()) { //NOTE: Sadly, WiFi.status is sometimes WL_CONNECTED even though it's really not // No WIFI return; } Serial.print(F("MQTT - Attempting connection to ")); Serial.print(config::mqtt_server); Serial.print(config::mqtt_encryption ? F(" (Encrypted") : F(" (Unencrypted")); Serial.print(F(", port ")); Serial.print(config::mqtt_port); Serial.print(F(") ")); Serial.print(F("User:'")); Serial.print(config::mqtt_user); Serial.print(F("' Password:'")); for (size_t i = 0; i < strlen(config::mqtt_password); ++i) { Serial.print("*"); } Serial.print(F("' ...")); led_effects::onBoardLEDOn(); // Wait for connection, at most 15s (default) mqttClient.connect(publish_topic, config::mqtt_user, config::mqtt_password); led_effects::onBoardLEDOff(); connected = mqttClient.connected(); if (connected) { if (config::allow_mqtt_commands) { char control_topic[60]; // Should be enough for "CO2sensors/ESPd03cc5/control" snprintf(control_topic, sizeof(control_topic), "%s/control", publish_topic); mqttClient.subscribe(control_topic); mqttClient.setCallback(controlSensorCallback);
} Serial.println(F(" Connected.")); last_failed_at = 0; } else { // As defined in PubSubClient, between -4 and 5 const __FlashStringHelper *mqtt_statuses[] = { F("Connection timeout"), F("Connection lost"), F( "Connection failed"), F("Disconnected"), F("Connected"), F("Bad protocol"), F("Bad client ID"), F( "Unavailable"), F("Bad credentials"), F("Unauthorized") }; last_failed_at = seconds(); Serial.print(mqtt_statuses[mqttClient.state() + 4]); Serial.print("! (Code="); Serial.print(mqttClient.state()); Serial.print(F("). Will try again in ")); Serial.print(config::wait_after_fail); Serial.println("s."); } } void publishIfTimeHasCome(const char *timestamp, const int16_t &co2, const float &temp, const float &hum) { // Send message via MQTT according to sending interval unsigned long now = seconds(); if (now - last_sent_at > config::mqtt_sending_interval) { last_sent_at = now; publish(timestamp, co2, temp, hum); } } void keepConnection() { // Keep MQTT connection if (!mqttClient.connected()) { reconnect(); } mqttClient.loop(); } /***************************************************************** * Callbacks for sensor commands * *****************************************************************/ void setMQTTinterval(int32_t sending_interval) { config::mqtt_sending_interval = sending_interval; Serial.print(F("Setting MQTT sending interval to : ")); Serial.print(config::mqtt_sending_interval); Serial.println("s."); config::save(); led_effects::showKITTWheel(color::green, 1); } // It can be hard to find the local IP of a sensor if it isn't connected to Serial port, and if mDNS is disabled. // If the sensor can be reach by MQTT, it can answer with info about local_ip and ssid. // The sensor will send the info to "CO2sensors/ESP123456/info". void sendInfoAboutLocalNetwork() { char info_topic[60]; // Should be enough for "CO2sensors/ESP123456/info" snprintf(info_topic, sizeof(info_topic), "%s/info", publish_topic); char payload[75]; // Should be enough for info json... const char *json_info_format = PSTR("{\"local_ip\":\"%s\", \"ssid\":\"%s\"}"); snprintf(payload, sizeof(payload), json_info_format, wifi::local_ip, config::selected_ssid()); mqttClient.publish(info_topic, payload); } }