mqtt.cpp 8.69 KB
Newer Older
1
2
3
4
#include "mqtt.h"

namespace config {
  // Values should be defined in config.h
5
  uint16_t sending_interval = MQTT_SENDING_INTERVAL; // [s]
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  //INFO: Listen to every CO2 sensor which is connected to the server:
  //  mosquitto_sub -h MQTT_SERVER -t 'CO2sensors/#' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -v
  const char *mqtt_server = MQTT_SERVER;
  const uint16_t mqtt_port = MQTT_PORT;
  const char *mqtt_user = MQTT_USER;
  const char *mqtt_password = MQTT_PASSWORD;
  const char *fingerprint PROGMEM = MQTT_SERVER_FINGERPRINT;
  const bool allow_mqtt_commands = ALLOW_MQTT_COMMANDS;
  const unsigned long wait_after_fail = 900; // [s] Wait 15 minutes after an MQTT connection fail, before trying again.
}
#if defined(ESP32)
#  include <WiFiClientSecure.h>
#endif
WiFiClientSecure espClient;
PubSubClient mqttClient(espClient);

namespace mqtt {
  unsigned long last_sent_at = 0;
  unsigned long last_failed_at = 0;
25
  bool connected = false;
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

  String publish_topic;
  const char *json_sensor_format;
  String last_successful_publish = "";

  void initialize(String &topic) {
    json_sensor_format = PSTR("{\"time\":\"%s\", \"co2\":%d, \"temp\":%.1f, \"rh\":%.1f}");
    publish_topic = topic;
#if defined(ESP8266)
    espClient.setFingerprint(config::fingerprint); // not supported by ESP32
#endif
    // mqttClient.setSocketTimeout(config::mqtt_timeout); //NOTE: somehow doesn't seem to have any effect on connect()
    mqttClient.setServer(config::mqtt_server, config::mqtt_port);
  }

  void publish(const String &timestamp, int16_t co2, float temperature, float humidity) {
    if (WiFi.status() == WL_CONNECTED && mqttClient.connected()) {
43
      led_effects::onBoardLEDOn();
44
45
46
47
48
49
      Serial.print(F("Publishing MQTT message ... "));

      char payload[75]; // Should be enough for json...
      snprintf(payload, sizeof(payload), json_sensor_format, timestamp.c_str(), co2, temperature, humidity);
      // Topic is the same as clientID. e.g. 'CO2sensors/ESP3d03da'
      if (mqttClient.publish(publish_topic.c_str(), payload)) {
Eric Duminil's avatar
Eric Duminil committed
50
        Serial.println(F("OK"));
51
52
        last_successful_publish = ntp::getLocalTime();
      } else {
Eric Duminil's avatar
Eric Duminil committed
53
        Serial.println(F("Failed."));
54
      }
55
      led_effects::onBoardLEDOff();
56
57
58
59
60
61
62
63
64
65
66
67
    }
  }

  void setTimer(String messageString) {
    messageString.replace("timer ", "");
    int timestep = messageString.toInt();
    if (timestep >= 2 && timestep <= 1800) {
      Serial.print(F("Setting Measurement Interval to : "));
      Serial.print(timestep);
      Serial.println("s.");
      sensor::scd30.setMeasurementInterval(messageString.toInt());
      config::measurement_timestep = messageString.toInt();
68
      led_effects::showKITTWheel(color::green, 1);
69
70
71
72
73
74
75
76
77
    }
  }

  void setMQTTinterval(String messageString) {
    messageString.replace("mqtt ", "");
    config::sending_interval = messageString.toInt();
    Serial.print(F("Setting Sending Interval to : "));
    Serial.print(config::sending_interval);
    Serial.println("s.");
78
    led_effects::showKITTWheel(color::green, 1);
79
80
  }

81
#ifdef AMPEL_CSV
82
83
84
85
86
87
  void setCSVinterval(String messageString) {
    messageString.replace("csv ", "");
    config::csv_interval = messageString.toInt();
    Serial.print(F("Setting CSV Interval to : "));
    Serial.print(config::csv_interval);
    Serial.println("s.");
88
    led_effects::showKITTWheel(color::green, 1);
89
  }
90
#endif
91

92
  void calibrateSensorToSpecificPPM(String messageString) {
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
    messageString.replace("calibrate ", "");
    long int calibrationLevel = messageString.toInt();
    if (calibrationLevel >= 400 && calibrationLevel <= 2000) {
      Serial.print(F("Force calibration, at "));
      config::co2_calibration_level = messageString.toInt();
      Serial.print(config::co2_calibration_level);
      Serial.println(" ppm.");
      sensor::startCalibrationProcess();
    }
  }

  void setCO2forDebugging(String messageString) {
    Serial.print(F("DEBUG. Setting CO2 to "));
    messageString.replace("co2 ", "");
    sensor::co2 = messageString.toInt();
    Serial.println(sensor::co2);
  }

  void sendInfoAboutLocalNetwork() {
112
    char info_topic[60]; // Should be enough for "CO2sensors/ESPd03cc5/info"
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
    snprintf(info_topic, sizeof(info_topic), "%s/info", publish_topic.c_str());

    char payload[75]; // Should be enough for info json...
    const char *json_info_format = PSTR("{\"local_ip\":\"%s\", \"ssid\":\"%s\"}");
    snprintf(payload, sizeof(payload), json_info_format, WiFi.localIP().toString().c_str(), WiFi.SSID().c_str());

    mqttClient.publish(info_topic, payload);
  }

  /**
   * Allows sensor to be controlled by commands over MQTT
   *
   * mosquitto_pub -h MQTT_SERVER -t 'CO2sensors/SENSOR_ID/control' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -m "reset"
   * mosquitto_pub -h MQTT_SERVER -t 'CO2sensors/SENSOR_ID/control' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -m "timer 30"
   * mosquitto_pub -h MQTT_SERVER -t 'CO2sensors/SENSOR_ID/control' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -m "mqtt 900"
   * mosquitto_pub -h MQTT_SERVER -t 'CO2sensors/SENSOR_ID/control' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -m "calibrate 700"
   */
  void controlSensorCallback(char *sub_topic, byte *message, unsigned int length) {
    if (length == 0) {
      return;
    }
134
    led_effects::onBoardLEDOn();
135
136
137
138
139
140
141
142
143
144
145
146
147
148
    Serial.print(F("Message arrived on topic: "));
    Serial.print(sub_topic);
    Serial.print(F(". Message: '"));
    String messageString;
    for (unsigned int i = 0; i < length; i++) {
      Serial.print((char) message[i]);
      messageString += (char) message[i];
    }
    Serial.println("'.");

    if (messageString.startsWith("co2 ")) {
      setCO2forDebugging(messageString);
    } else if (messageString.startsWith("timer ")) {
      setTimer(messageString);
149
150
    } else if (messageString == "calibrate") {
      sensor::startCalibrationProcess();
151
    } else if (messageString.startsWith("calibrate ")) {
152
      calibrateSensorToSpecificPPM(messageString);
153
154
155
156
157
    } else if (messageString.startsWith("mqtt ")) {
      setMQTTinterval(messageString);
    } else if (messageString == "publish") {
      Serial.println(F("Forcing MQTT publish now."));
      publish(sensor::timestamp, sensor::co2, sensor::temperature, sensor::humidity);
158
159
160
#ifdef AMPEL_CSV
    } else if (messageString.startsWith("csv ")) {
      setCSVinterval(messageString);
161
162
    } else if (messageString == "format_filesystem") {
      FS_LIB.format();
163
      led_effects::showKITTWheel(color::blue, 2);
164
#endif
165
    } else if (messageString == "night_mode") {
166
      led_effects::toggleNightMode();
167
168
169
    } else if (messageString == "local_ip") {
      sendInfoAboutLocalNetwork();
    } else if (messageString == "reset") {
Eric Duminil's avatar
Eric Duminil committed
170
      ESP.restart(); // softer than ESP.reset()
171
    } else {
172
      led_effects::showKITTWheel(color::red, 1);
173
174
175
      Serial.println(F("Message not supported. Doing nothing."));
    }
    delay(50);
176
    led_effects::onBoardLEDOff();
177
178
179
180
181
182
183
184
185
186
187
188
189
  }

  void reconnect() {
    if (last_failed_at > 0 && seconds() - last_failed_at < config::wait_after_fail) {
      // It failed less than wait_after_fail ago. Not even trying.
      return;
    }
    if (WiFi.status() != WL_CONNECTED) { //NOTE: Sadly, WiFi.status is sometimes WL_CONNECTED even though it's really not
      // No WIFI
      return;
    }
    Serial.print(F("Attempting MQTT connection..."));

190
    led_effects::onBoardLEDOn();
191
192
    // Wait for connection, at most 15s (default)
    mqttClient.connect(publish_topic.c_str(), config::mqtt_user, config::mqtt_password);
193
    led_effects::onBoardLEDOff();
194

195
196
197
    connected = mqttClient.connected();

    if (connected) {
198
      if (config::allow_mqtt_commands) {
199
        char control_topic[60]; // Should be enough for "CO2sensors/ESPd03cc5/control"
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
        snprintf(control_topic, sizeof(control_topic), "%s/control", publish_topic.c_str());
        mqttClient.subscribe(control_topic);
        mqttClient.setCallback(controlSensorCallback);
      }
      Serial.println(F(" Connected."));
      last_failed_at = 0;
    } else {
      last_failed_at = seconds();
      Serial.print(F(" Failed! Error code="));
      Serial.print(mqttClient.state());
      Serial.print(F(". Will try again in "));
      Serial.print(config::wait_after_fail);
      Serial.println("s.");
    }
  }

Eric Duminil's avatar
Eric Duminil committed
216
  void publishIfTimeHasCome(const String &timeStamp, const int16_t &co2, const float &temp, const float &hum) {
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
    // Send message via MQTT according to sending interval
    unsigned long now = seconds();
    //TODO: Send average since last MQTT message?
    if (now - last_sent_at > config::sending_interval) {
      last_sent_at = now;
      publish(timeStamp, co2, temp, hum);
    }
  }

  void keepConnection() {
    // Keep MQTT connection
    if (!mqttClient.connected()) {
      reconnect();
    }
    mqttClient.loop();
  }

}