Skip to content
GitLab
Explore
Projects
Groups
Snippets
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
co2ampel
ampel-firmware
Commits
2b8aa199
Commit
2b8aa199
authored
3 years ago
by
Eric Duminil
Browse files
Options
Download
Email Patches
Plain Diff
Trying to remove warnings
parent
88944cef
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
ampel-firmware/src/lib/PubSubClient/src/PubSubClient.cpp
+612
-606
ampel-firmware/src/lib/PubSubClient/src/PubSubClient.cpp
with
612 additions
and
606 deletions
+612
-606
ampel-firmware/src/lib/PubSubClient/src/PubSubClient.cpp
+
612
-
606
View file @
2b8aa199
/*
/*
PubSubClient.cpp - A simple client for MQTT.
PubSubClient.cpp - A simple client for MQTT.
Nick O'Leary
Nick O'Leary
http://knolleary.net
http://knolleary.net
*/
*/
#include
"PubSubClient.h"
#include
"PubSubClient.h"
#include
"Arduino.h"
#include
"Arduino.h"
PubSubClient
::
PubSubClient
()
{
PubSubClient
::
PubSubClient
()
{
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_client
=
NULL
;
this
->
_client
=
NULL
;
this
->
stream
=
NULL
;
this
->
stream
=
NULL
;
setCallback
(
NULL
);
setCallback
(
NULL
);
this
->
bufferSize
=
0
;
this
->
bufferSize
=
0
;
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
}
}
PubSubClient
::
PubSubClient
(
Client
&
client
)
{
PubSubClient
::
PubSubClient
(
Client
&
client
)
{
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_state
=
MQTT_DISCONNECTED
;
setClient
(
client
);
setClient
(
client
);
this
->
stream
=
NULL
;
this
->
stream
=
NULL
;
this
->
bufferSize
=
0
;
this
->
bufferSize
=
0
;
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
}
}
PubSubClient
::
PubSubClient
(
IPAddress
addr
,
uint16_t
port
,
Client
&
client
)
{
PubSubClient
::
PubSubClient
(
IPAddress
addr
,
uint16_t
port
,
Client
&
client
)
{
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_state
=
MQTT_DISCONNECTED
;
setServer
(
addr
,
port
);
setServer
(
addr
,
port
);
setClient
(
client
);
setClient
(
client
);
this
->
stream
=
NULL
;
this
->
stream
=
NULL
;
this
->
bufferSize
=
0
;
this
->
bufferSize
=
0
;
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
}
}
PubSubClient
::
PubSubClient
(
IPAddress
addr
,
uint16_t
port
,
Client
&
client
,
Stream
&
stream
)
{
PubSubClient
::
PubSubClient
(
IPAddress
addr
,
uint16_t
port
,
Client
&
client
,
Stream
&
stream
)
{
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_state
=
MQTT_DISCONNECTED
;
setServer
(
addr
,
port
);
setServer
(
addr
,
port
);
setClient
(
client
);
setClient
(
client
);
setStream
(
stream
);
setStream
(
stream
);
this
->
bufferSize
=
0
;
this
->
bufferSize
=
0
;
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
}
}
PubSubClient
::
PubSubClient
(
IPAddress
addr
,
uint16_t
port
,
MQTT_CALLBACK_SIGNATURE
,
Client
&
client
)
{
PubSubClient
::
PubSubClient
(
IPAddress
addr
,
uint16_t
port
,
MQTT_CALLBACK_SIGNATURE
,
Client
&
client
)
{
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_state
=
MQTT_DISCONNECTED
;
setServer
(
addr
,
port
);
setServer
(
addr
,
port
);
setCallback
(
callback
);
setCallback
(
callback
);
setClient
(
client
);
setClient
(
client
);
this
->
stream
=
NULL
;
this
->
stream
=
NULL
;
this
->
bufferSize
=
0
;
this
->
bufferSize
=
0
;
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
}
}
PubSubClient
::
PubSubClient
(
IPAddress
addr
,
uint16_t
port
,
MQTT_CALLBACK_SIGNATURE
,
Client
&
client
,
Stream
&
stream
)
{
PubSubClient
::
PubSubClient
(
IPAddress
addr
,
uint16_t
port
,
MQTT_CALLBACK_SIGNATURE
,
Client
&
client
,
Stream
&
stream
)
{
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_state
=
MQTT_DISCONNECTED
;
setServer
(
addr
,
port
);
setServer
(
addr
,
port
);
setCallback
(
callback
);
setCallback
(
callback
);
setClient
(
client
);
setClient
(
client
);
setStream
(
stream
);
setStream
(
stream
);
this
->
bufferSize
=
0
;
this
->
bufferSize
=
0
;
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
}
}
PubSubClient
::
PubSubClient
(
uint8_t
*
ip
,
uint16_t
port
,
Client
&
client
)
{
PubSubClient
::
PubSubClient
(
uint8_t
*
ip
,
uint16_t
port
,
Client
&
client
)
{
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_state
=
MQTT_DISCONNECTED
;
setServer
(
ip
,
port
);
setServer
(
ip
,
port
);
setClient
(
client
);
setClient
(
client
);
this
->
stream
=
NULL
;
this
->
stream
=
NULL
;
this
->
bufferSize
=
0
;
this
->
bufferSize
=
0
;
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
}
}
PubSubClient
::
PubSubClient
(
uint8_t
*
ip
,
uint16_t
port
,
Client
&
client
,
Stream
&
stream
)
{
PubSubClient
::
PubSubClient
(
uint8_t
*
ip
,
uint16_t
port
,
Client
&
client
,
Stream
&
stream
)
{
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_state
=
MQTT_DISCONNECTED
;
setServer
(
ip
,
port
);
setServer
(
ip
,
port
);
setClient
(
client
);
setClient
(
client
);
setStream
(
stream
);
setStream
(
stream
);
this
->
bufferSize
=
0
;
this
->
bufferSize
=
0
;
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
}
}
PubSubClient
::
PubSubClient
(
uint8_t
*
ip
,
uint16_t
port
,
MQTT_CALLBACK_SIGNATURE
,
Client
&
client
)
{
PubSubClient
::
PubSubClient
(
uint8_t
*
ip
,
uint16_t
port
,
MQTT_CALLBACK_SIGNATURE
,
Client
&
client
)
{
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_state
=
MQTT_DISCONNECTED
;
setServer
(
ip
,
port
);
setServer
(
ip
,
port
);
setCallback
(
callback
);
setCallback
(
callback
);
setClient
(
client
);
setClient
(
client
);
this
->
stream
=
NULL
;
this
->
stream
=
NULL
;
this
->
bufferSize
=
0
;
this
->
bufferSize
=
0
;
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
}
}
PubSubClient
::
PubSubClient
(
uint8_t
*
ip
,
uint16_t
port
,
MQTT_CALLBACK_SIGNATURE
,
Client
&
client
,
Stream
&
stream
)
{
PubSubClient
::
PubSubClient
(
uint8_t
*
ip
,
uint16_t
port
,
MQTT_CALLBACK_SIGNATURE
,
Client
&
client
,
Stream
&
stream
)
{
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_state
=
MQTT_DISCONNECTED
;
setServer
(
ip
,
port
);
setServer
(
ip
,
port
);
setCallback
(
callback
);
setCallback
(
callback
);
setClient
(
client
);
setClient
(
client
);
setStream
(
stream
);
setStream
(
stream
);
this
->
bufferSize
=
0
;
this
->
bufferSize
=
0
;
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
}
}
PubSubClient
::
PubSubClient
(
const
char
*
domain
,
uint16_t
port
,
Client
&
client
)
{
PubSubClient
::
PubSubClient
(
const
char
*
domain
,
uint16_t
port
,
Client
&
client
)
{
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_state
=
MQTT_DISCONNECTED
;
setServer
(
domain
,
port
);
setServer
(
domain
,
port
);
setClient
(
client
);
setClient
(
client
);
this
->
stream
=
NULL
;
this
->
stream
=
NULL
;
this
->
bufferSize
=
0
;
this
->
bufferSize
=
0
;
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
}
}
PubSubClient
::
PubSubClient
(
const
char
*
domain
,
uint16_t
port
,
Client
&
client
,
Stream
&
stream
)
{
PubSubClient
::
PubSubClient
(
const
char
*
domain
,
uint16_t
port
,
Client
&
client
,
Stream
&
stream
)
{
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_state
=
MQTT_DISCONNECTED
;
setServer
(
domain
,
port
);
setServer
(
domain
,
port
);
setClient
(
client
);
setClient
(
client
);
setStream
(
stream
);
setStream
(
stream
);
this
->
bufferSize
=
0
;
this
->
bufferSize
=
0
;
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
}
}
PubSubClient
::
PubSubClient
(
const
char
*
domain
,
uint16_t
port
,
MQTT_CALLBACK_SIGNATURE
,
Client
&
client
)
{
PubSubClient
::
PubSubClient
(
const
char
*
domain
,
uint16_t
port
,
MQTT_CALLBACK_SIGNATURE
,
Client
&
client
)
{
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_state
=
MQTT_DISCONNECTED
;
setServer
(
domain
,
port
);
setServer
(
domain
,
port
);
setCallback
(
callback
);
setCallback
(
callback
);
setClient
(
client
);
setClient
(
client
);
this
->
stream
=
NULL
;
this
->
stream
=
NULL
;
this
->
bufferSize
=
0
;
this
->
bufferSize
=
0
;
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
}
}
PubSubClient
::
PubSubClient
(
const
char
*
domain
,
uint16_t
port
,
MQTT_CALLBACK_SIGNATURE
,
Client
&
client
,
Stream
&
stream
)
{
PubSubClient
::
PubSubClient
(
const
char
*
domain
,
uint16_t
port
,
MQTT_CALLBACK_SIGNATURE
,
Client
&
client
,
Stream
&
stream
)
{
this
->
_state
=
MQTT_DISCONNECTED
;
this
->
_state
=
MQTT_DISCONNECTED
;
setServer
(
domain
,
port
);
setServer
(
domain
,
port
);
setCallback
(
callback
);
setCallback
(
callback
);
setClient
(
client
);
setClient
(
client
);
setStream
(
stream
);
setStream
(
stream
);
this
->
bufferSize
=
0
;
this
->
bufferSize
=
0
;
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setBufferSize
(
MQTT_MAX_PACKET_SIZE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setKeepAlive
(
MQTT_KEEPALIVE
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
setSocketTimeout
(
MQTT_SOCKET_TIMEOUT
);
}
}
PubSubClient
::~
PubSubClient
()
{
PubSubClient
::~
PubSubClient
()
{
...
@@ -163,424 +163,431 @@ PubSubClient::~PubSubClient() {
...
@@ -163,424 +163,431 @@ PubSubClient::~PubSubClient() {
}
}
boolean
PubSubClient
::
connect
(
const
char
*
id
)
{
boolean
PubSubClient
::
connect
(
const
char
*
id
)
{
return
connect
(
id
,
NULL
,
NULL
,
0
,
0
,
0
,
0
,
1
);
return
connect
(
id
,
NULL
,
NULL
,
0
,
0
,
0
,
0
,
1
);
}
}
boolean
PubSubClient
::
connect
(
const
char
*
id
,
const
char
*
user
,
const
char
*
pass
)
{
boolean
PubSubClient
::
connect
(
const
char
*
id
,
const
char
*
user
,
const
char
*
pass
)
{
return
connect
(
id
,
user
,
pass
,
0
,
0
,
0
,
0
,
1
);
return
connect
(
id
,
user
,
pass
,
0
,
0
,
0
,
0
,
1
);
}
}
boolean
PubSubClient
::
connect
(
const
char
*
id
,
const
char
*
willTopic
,
uint8_t
willQos
,
boolean
willRetain
,
const
char
*
willMessage
)
{
boolean
PubSubClient
::
connect
(
const
char
*
id
,
const
char
*
willTopic
,
uint8_t
willQos
,
boolean
willRetain
,
return
connect
(
id
,
NULL
,
NULL
,
willTopic
,
willQos
,
willRetain
,
willMessage
,
1
);
const
char
*
willMessage
)
{
return
connect
(
id
,
NULL
,
NULL
,
willTopic
,
willQos
,
willRetain
,
willMessage
,
1
);
}
}
boolean
PubSubClient
::
connect
(
const
char
*
id
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
willTopic
,
uint8_t
willQos
,
boolean
willRetain
,
const
char
*
willMessage
)
{
boolean
PubSubClient
::
connect
(
const
char
*
id
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
willTopic
,
return
connect
(
id
,
user
,
pass
,
willTopic
,
willQos
,
willRetain
,
willMessage
,
1
);
uint8_t
willQos
,
boolean
willRetain
,
const
char
*
willMessage
)
{
return
connect
(
id
,
user
,
pass
,
willTopic
,
willQos
,
willRetain
,
willMessage
,
1
);
}
}
boolean
PubSubClient
::
connect
(
const
char
*
id
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
willTopic
,
uint8_t
willQos
,
boolean
willRetain
,
const
char
*
willMessage
,
boolean
cleanSession
)
{
boolean
PubSubClient
::
connect
(
const
char
*
id
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
willTopic
,
if
(
!
connected
())
{
uint8_t
willQos
,
boolean
willRetain
,
const
char
*
willMessage
,
boolean
cleanSession
)
{
int
result
=
0
;
if
(
!
connected
())
{
int
result
=
0
;
if
(
_client
->
connected
())
{
result
=
1
;
}
else
{
if
(
domain
!=
NULL
)
{
result
=
_client
->
connect
(
this
->
domain
,
this
->
port
);
}
else
{
result
=
_client
->
connect
(
this
->
ip
,
this
->
port
);
}
}
if
(
_client
->
connected
())
{
if
(
result
==
1
)
{
result
=
1
;
nextMsgId
=
1
;
}
else
{
// Leave room in the buffer for header and variable length field
if
(
domain
!=
NULL
)
{
uint16_t
length
=
MQTT_MAX_HEADER_SIZE
;
result
=
_client
->
connect
(
this
->
domain
,
this
->
port
);
unsigned
int
j
;
}
else
{
result
=
_client
->
connect
(
this
->
ip
,
this
->
port
);
}
}
if
(
result
==
1
)
{
nextMsgId
=
1
;
// Leave room in the buffer for header and variable length field
uint16_t
length
=
MQTT_MAX_HEADER_SIZE
;
unsigned
int
j
;
#if MQTT_VERSION == MQTT_VERSION_3_1
#if MQTT_VERSION == MQTT_VERSION_3_1
uint8_t
d
[
9
]
=
{
0x00
,
0x06
,
'M'
,
'Q'
,
'I'
,
's'
,
'd'
,
'p'
,
MQTT_VERSION
};
uint8_t
d
[
9
]
=
{
0x00
,
0x06
,
'M'
,
'Q'
,
'I'
,
's'
,
'd'
,
'p'
,
MQTT_VERSION
};
#define MQTT_HEADER_VERSION_LENGTH 9
#define MQTT_HEADER_VERSION_LENGTH 9
#elif MQTT_VERSION == MQTT_VERSION_3_1_1
#elif MQTT_VERSION == MQTT_VERSION_3_1_1
uint8_t
d
[
7
]
=
{
0x00
,
0x04
,
'M'
,
'Q'
,
'T'
,
'T'
,
MQTT_VERSION
};
uint8_t
d
[
7
]
=
{
0x00
,
0x04
,
'M'
,
'Q'
,
'T'
,
'T'
,
MQTT_VERSION
};
#define MQTT_HEADER_VERSION_LENGTH 7
#define MQTT_HEADER_VERSION_LENGTH 7
#endif
#endif
for
(
j
=
0
;
j
<
MQTT_HEADER_VERSION_LENGTH
;
j
++
)
{
for
(
j
=
0
;
j
<
MQTT_HEADER_VERSION_LENGTH
;
j
++
)
{
this
->
buffer
[
length
++
]
=
d
[
j
];
this
->
buffer
[
length
++
]
=
d
[
j
];
}
}
uint8_t
v
;
uint8_t
v
;
if
(
willTopic
)
{
if
(
willTopic
)
{
v
=
0x04
|
(
willQos
<<
3
)
|
(
willRetain
<<
5
);
v
=
0x04
|
(
willQos
<<
3
)
|
(
willRetain
<<
5
);
}
else
{
}
else
{
v
=
0x00
;
v
=
0x00
;
}
}
if
(
cleanSession
)
{
if
(
cleanSession
)
{
v
=
v
|
0x02
;
v
=
v
|
0x02
;
}
}
if
(
user
!=
NULL
)
{
if
(
user
!=
NULL
)
{
v
=
v
|
0x80
;
v
=
v
|
0x80
;
if
(
pass
!=
NULL
)
{
if
(
pass
!=
NULL
)
{
v
=
v
|
(
0x80
>>
1
);
v
=
v
|
(
0x80
>>
1
);
}
}
}
}
this
->
buffer
[
length
++
]
=
v
;
this
->
buffer
[
length
++
]
=
v
;
this
->
buffer
[
length
++
]
=
((
this
->
keepAlive
)
>>
8
);
this
->
buffer
[
length
++
]
=
((
this
->
keepAlive
)
>>
8
);
this
->
buffer
[
length
++
]
=
((
this
->
keepAlive
)
&
0xFF
);
this
->
buffer
[
length
++
]
=
((
this
->
keepAlive
)
&
0xFF
);
CHECK_STRING_LENGTH
(
length
,
id
)
CHECK_STRING_LENGTH
(
length
,
id
)
length
=
writeString
(
id
,
this
->
buffer
,
length
);
length
=
writeString
(
id
,
this
->
buffer
,
length
);
if
(
willTopic
)
{
if
(
willTopic
)
{
CHECK_STRING_LENGTH
(
length
,
willTopic
)
CHECK_STRING_LENGTH
(
length
,
willTopic
)
length
=
writeString
(
willTopic
,
this
->
buffer
,
length
);
length
=
writeString
(
willTopic
,
this
->
buffer
,
length
);
CHECK_STRING_LENGTH
(
length
,
willMessage
)
CHECK_STRING_LENGTH
(
length
,
willMessage
)
length
=
writeString
(
willMessage
,
this
->
buffer
,
length
);
length
=
writeString
(
willMessage
,
this
->
buffer
,
length
);
}
}
if
(
user
!=
NULL
)
{
if
(
user
!=
NULL
)
{
CHECK_STRING_LENGTH
(
length
,
user
)
CHECK_STRING_LENGTH
(
length
,
user
)
length
=
writeString
(
user
,
this
->
buffer
,
length
);
length
=
writeString
(
user
,
this
->
buffer
,
length
);
if
(
pass
!=
NULL
)
{
if
(
pass
!=
NULL
)
{
CHECK_STRING_LENGTH
(
length
,
pass
)
CHECK_STRING_LENGTH
(
length
,
pass
)
length
=
writeString
(
pass
,
this
->
buffer
,
length
);
length
=
writeString
(
pass
,
this
->
buffer
,
length
);
}
}
}
}
write
(
MQTTCONNECT
,
this
->
buffer
,
length
-
MQTT_MAX_HEADER_SIZE
);
write
(
MQTTCONNECT
,
this
->
buffer
,
length
-
MQTT_MAX_HEADER_SIZE
);
lastInActivity
=
lastOutActivity
=
millis
();
lastInActivity
=
lastOutActivity
=
millis
();
while
(
!
_client
->
available
())
{
while
(
!
_client
->
available
())
{
unsigned
long
t
=
millis
();
unsigned
long
t
=
millis
();
if
(
t
-
lastInActivity
>=
((
int32_t
)
this
->
socketTimeout
*
1000UL
))
{
if
(
t
-
lastInActivity
>=
((
int32_t
)
this
->
socketTimeout
*
1000UL
))
{
_state
=
MQTT_CONNECTION_TIMEOUT
;
_state
=
MQTT_CONNECTION_TIMEOUT
;
_client
->
stop
();
_client
->
stop
();
return
false
;
return
false
;
}
}
}
}
uint8_t
llen
;
uint8_t
llen
;
uint32_t
len
=
readPacket
(
&
llen
);
uint32_t
len
=
readPacket
(
&
llen
);
if
(
len
==
4
)
{
if
(
len
==
4
)
{
if
(
buffer
[
3
]
==
0
)
{
if
(
buffer
[
3
]
==
0
)
{
lastInActivity
=
millis
();
lastInActivity
=
millis
();
pingOutstanding
=
false
;
pingOutstanding
=
false
;
_state
=
MQTT_CONNECTED
;
_state
=
MQTT_CONNECTED
;
return
true
;
return
true
;
}
else
{
_state
=
buffer
[
3
];
}
}
_client
->
stop
();
}
else
{
}
else
{
_state
=
MQTT_CONNECT_FAILED
;
_state
=
buffer
[
3
]
;
}
}
return
false
;
}
_client
->
stop
();
}
else
{
_state
=
MQTT_CONNECT_FAILED
;
}
}
return
true
;
return
false
;
}
return
true
;
}
}
// reads a byte into result
// reads a byte into result
boolean
PubSubClient
::
readByte
(
uint8_t
*
result
)
{
boolean
PubSubClient
::
readByte
(
uint8_t
*
result
)
{
uint32_t
previousMillis
=
millis
();
uint32_t
previousMillis
=
millis
();
while
(
!
_client
->
available
())
{
while
(
!
_client
->
available
())
{
yield
();
yield
();
uint32_t
currentMillis
=
millis
();
uint32_t
currentMillis
=
millis
();
if
(
currentMillis
-
previousMillis
>=
((
int32_t
)
this
->
socketTimeout
*
1000
)){
if
(
currentMillis
-
previousMillis
>=
((
int32_t
)
this
->
socketTimeout
*
1000
))
{
return
false
;
return
false
;
}
}
}
}
*
result
=
_client
->
read
();
*
result
=
_client
->
read
();
return
true
;
return
true
;
}
}
// reads a byte into result[*index] and increments index
// reads a byte into result[*index] and increments index
boolean
PubSubClient
::
readByte
(
uint8_t
*
result
,
uint16_t
*
index
){
boolean
PubSubClient
::
readByte
(
uint8_t
*
result
,
uint16_t
*
index
)
{
uint16_t
current_index
=
*
index
;
uint16_t
current_index
=
*
index
;
uint8_t
*
write_address
=
&
(
result
[
current_index
]);
uint8_t
*
write_address
=
&
(
result
[
current_index
]);
if
(
readByte
(
write_address
)){
if
(
readByte
(
write_address
))
{
*
index
=
current_index
+
1
;
*
index
=
current_index
+
1
;
return
true
;
return
true
;
}
}
return
false
;
return
false
;
}
}
uint32_t
PubSubClient
::
readPacket
(
uint8_t
*
lengthLength
)
{
uint32_t
PubSubClient
::
readPacket
(
uint8_t
*
lengthLength
)
{
uint16_t
len
=
0
;
uint16_t
len
=
0
;
if
(
!
readByte
(
this
->
buffer
,
&
len
))
return
0
;
if
(
!
readByte
(
this
->
buffer
,
&
len
))
bool
isPublish
=
(
this
->
buffer
[
0
]
&
0xF0
)
==
MQTTPUBLISH
;
return
0
;
uint32_t
multiplier
=
1
;
bool
isPublish
=
(
this
->
buffer
[
0
]
&
0xF0
)
==
MQTTPUBLISH
;
uint32_t
length
=
0
;
uint32_t
multiplier
=
1
;
uint8_t
digit
=
0
;
uint32_t
length
=
0
;
uint16_t
skip
=
0
;
uint8_t
digit
=
0
;
uint32_t
start
=
0
;
uint16_t
skip
=
0
;
uint32_t
start
=
0
;
do
{
if
(
len
==
5
)
{
do
{
// Invalid remaining length encoding - kill the connection
if
(
len
==
5
)
{
_state
=
MQTT_DISCONNECTED
;
// Invalid remaining length encoding - kill the connection
_client
->
stop
();
_state
=
MQTT_DISCONNECTED
;
return
0
;
_client
->
stop
();
}
return
0
;
if
(
!
readByte
(
&
digit
))
return
0
;
this
->
buffer
[
len
++
]
=
digit
;
length
+=
(
digit
&
127
)
*
multiplier
;
multiplier
<<=
7
;
//multiplier *= 128
}
while
((
digit
&
128
)
!=
0
);
*
lengthLength
=
len
-
1
;
if
(
isPublish
)
{
// Read in topic length to calculate bytes to skip over for Stream writing
if
(
!
readByte
(
this
->
buffer
,
&
len
))
return
0
;
if
(
!
readByte
(
this
->
buffer
,
&
len
))
return
0
;
skip
=
(
this
->
buffer
[
*
lengthLength
+
1
]
<<
8
)
+
this
->
buffer
[
*
lengthLength
+
2
];
start
=
2
;
if
(
this
->
buffer
[
0
]
&
MQTTQOS1
)
{
// skip message id
skip
+=
2
;
}
}
}
uint32_t
idx
=
len
;
if
(
!
readByte
(
&
digit
))
return
0
;
for
(
uint32_t
i
=
start
;
i
<
length
;
i
++
)
{
this
->
buffer
[
len
++
]
=
digit
;
if
(
!
readByte
(
&
digit
))
return
0
;
length
+=
(
digit
&
127
)
*
multiplier
;
if
(
this
->
stream
)
{
multiplier
<<=
7
;
//multiplier *= 128
if
(
isPublish
&&
idx
-*
lengthLength
-
2
>
skip
)
{
}
while
((
digit
&
128
)
!=
0
);
this
->
stream
->
write
(
digit
);
*
lengthLength
=
len
-
1
;
}
}
if
(
isPublish
)
{
// Read in topic length to calculate bytes to skip over for Stream writing
if
(
len
<
this
->
bufferSize
)
{
if
(
!
readByte
(
this
->
buffer
,
&
len
))
this
->
buffer
[
len
]
=
digit
;
return
0
;
len
++
;
if
(
!
readByte
(
this
->
buffer
,
&
len
))
}
return
0
;
idx
++
;
skip
=
(
this
->
buffer
[
*
lengthLength
+
1
]
<<
8
)
+
this
->
buffer
[
*
lengthLength
+
2
];
start
=
2
;
if
(
this
->
buffer
[
0
]
&
MQTTQOS1
)
{
// skip message id
skip
+=
2
;
}
}
uint32_t
idx
=
len
;
for
(
uint32_t
i
=
start
;
i
<
length
;
i
++
)
{
if
(
!
readByte
(
&
digit
))
return
0
;
if
(
this
->
stream
)
{
if
(
isPublish
&&
idx
-
*
lengthLength
-
2
>
skip
)
{
this
->
stream
->
write
(
digit
);
}
}
}
if
(
!
this
->
stream
&&
idx
>
this
->
bufferSize
)
{
if
(
len
<
this
->
bufferSize
)
{
len
=
0
;
// This will cause the packet to be ignored.
this
->
buffer
[
len
]
=
digit
;
len
++
;
}
}
return
len
;
idx
++
;
}
if
(
!
this
->
stream
&&
idx
>
this
->
bufferSize
)
{
len
=
0
;
// This will cause the packet to be ignored.
}
return
len
;
}
}
boolean
PubSubClient
::
loop
()
{
boolean
PubSubClient
::
loop
()
{
if
(
connected
())
{
if
(
connected
())
{
unsigned
long
t
=
millis
();
unsigned
long
t
=
millis
();
if
((
t
-
lastInActivity
>
this
->
keepAlive
*
1000UL
)
||
(
t
-
lastOutActivity
>
this
->
keepAlive
*
1000UL
))
{
if
((
t
-
lastInActivity
>
this
->
keepAlive
*
1000UL
)
||
(
t
-
lastOutActivity
>
this
->
keepAlive
*
1000UL
))
{
if
(
pingOutstanding
)
{
if
(
pingOutstanding
)
{
this
->
_state
=
MQTT_CONNECTION_TIMEOUT
;
this
->
_state
=
MQTT_CONNECTION_TIMEOUT
;
_client
->
stop
();
_client
->
stop
();
return
false
;
return
false
;
}
else
{
this
->
buffer
[
0
]
=
MQTTPINGREQ
;
this
->
buffer
[
1
]
=
0
;
_client
->
write
(
this
->
buffer
,
2
);
lastOutActivity
=
t
;
lastInActivity
=
t
;
pingOutstanding
=
true
;
}
}
if
(
_client
->
available
())
{
uint8_t
llen
;
uint16_t
len
=
readPacket
(
&
llen
);
uint16_t
msgId
=
0
;
uint8_t
*
payload
;
if
(
len
>
0
)
{
lastInActivity
=
t
;
uint8_t
type
=
this
->
buffer
[
0
]
&
0xF0
;
if
(
type
==
MQTTPUBLISH
)
{
if
(
callback
)
{
uint16_t
tl
=
(
this
->
buffer
[
llen
+
1
]
<<
8
)
+
this
->
buffer
[
llen
+
2
];
/* topic length in bytes */
memmove
(
this
->
buffer
+
llen
+
2
,
this
->
buffer
+
llen
+
3
,
tl
);
/* move topic inside buffer 1 byte to front */
this
->
buffer
[
llen
+
2
+
tl
]
=
0
;
/* end the topic as a 'C' string with \x00 */
char
*
topic
=
(
char
*
)
this
->
buffer
+
llen
+
2
;
// msgId only present for QOS>0
if
((
this
->
buffer
[
0
]
&
0x06
)
==
MQTTQOS1
)
{
msgId
=
(
this
->
buffer
[
llen
+
3
+
tl
]
<<
8
)
+
this
->
buffer
[
llen
+
3
+
tl
+
1
];
payload
=
this
->
buffer
+
llen
+
3
+
tl
+
2
;
callback
(
topic
,
payload
,
len
-
llen
-
3
-
tl
-
2
);
this
->
buffer
[
0
]
=
MQTTPUBACK
;
this
->
buffer
[
1
]
=
2
;
this
->
buffer
[
2
]
=
(
msgId
>>
8
);
this
->
buffer
[
3
]
=
(
msgId
&
0xFF
);
_client
->
write
(
this
->
buffer
,
4
);
lastOutActivity
=
t
;
}
else
{
}
else
{
this
->
buffer
[
0
]
=
MQTTPINGREQ
;
payload
=
this
->
buffer
+
llen
+
3
+
tl
;
this
->
buffer
[
1
]
=
0
;
callback
(
topic
,
payload
,
len
-
llen
-
3
-
tl
);
_client
->
write
(
this
->
buffer
,
2
);
lastOutActivity
=
t
;
lastInActivity
=
t
;
pingOutstanding
=
true
;
}
}
if
(
_client
->
available
())
{
uint8_t
llen
;
uint16_t
len
=
readPacket
(
&
llen
);
uint16_t
msgId
=
0
;
uint8_t
*
payload
;
if
(
len
>
0
)
{
lastInActivity
=
t
;
uint8_t
type
=
this
->
buffer
[
0
]
&
0xF0
;
if
(
type
==
MQTTPUBLISH
)
{
if
(
callback
)
{
uint16_t
tl
=
(
this
->
buffer
[
llen
+
1
]
<<
8
)
+
this
->
buffer
[
llen
+
2
];
/* topic length in bytes */
memmove
(
this
->
buffer
+
llen
+
2
,
this
->
buffer
+
llen
+
3
,
tl
);
/* move topic inside buffer 1 byte to front */
this
->
buffer
[
llen
+
2
+
tl
]
=
0
;
/* end the topic as a 'C' string with \x00 */
char
*
topic
=
(
char
*
)
this
->
buffer
+
llen
+
2
;
// msgId only present for QOS>0
if
((
this
->
buffer
[
0
]
&
0x06
)
==
MQTTQOS1
)
{
msgId
=
(
this
->
buffer
[
llen
+
3
+
tl
]
<<
8
)
+
this
->
buffer
[
llen
+
3
+
tl
+
1
];
payload
=
this
->
buffer
+
llen
+
3
+
tl
+
2
;
callback
(
topic
,
payload
,
len
-
llen
-
3
-
tl
-
2
);
this
->
buffer
[
0
]
=
MQTTPUBACK
;
this
->
buffer
[
1
]
=
2
;
this
->
buffer
[
2
]
=
(
msgId
>>
8
);
this
->
buffer
[
3
]
=
(
msgId
&
0xFF
);
_client
->
write
(
this
->
buffer
,
4
);
lastOutActivity
=
t
;
}
else
{
payload
=
this
->
buffer
+
llen
+
3
+
tl
;
callback
(
topic
,
payload
,
len
-
llen
-
3
-
tl
);
}
}
}
else
if
(
type
==
MQTTPINGREQ
)
{
this
->
buffer
[
0
]
=
MQTTPINGRESP
;
this
->
buffer
[
1
]
=
0
;
_client
->
write
(
this
->
buffer
,
2
);
}
else
if
(
type
==
MQTTPINGRESP
)
{
pingOutstanding
=
false
;
}
}
else
if
(
!
connected
())
{
// readPacket has closed the connection
return
false
;
}
}
}
}
else
if
(
type
==
MQTTPINGREQ
)
{
this
->
buffer
[
0
]
=
MQTTPINGRESP
;
this
->
buffer
[
1
]
=
0
;
_client
->
write
(
this
->
buffer
,
2
);
}
else
if
(
type
==
MQTTPINGRESP
)
{
pingOutstanding
=
false
;
}
}
return
true
;
}
else
if
(
!
connected
())
{
// readPacket has closed the connection
return
false
;
}
}
}
return
false
;
return
true
;
}
return
false
;
}
}
boolean
PubSubClient
::
publish
(
const
char
*
topic
,
const
char
*
payload
)
{
boolean
PubSubClient
::
publish
(
const
char
*
topic
,
const
char
*
payload
)
{
return
publish
(
topic
,(
const
uint8_t
*
)
payload
,
payload
?
strnlen
(
payload
,
this
->
bufferSize
)
:
0
,
false
);
return
publish
(
topic
,
(
const
uint8_t
*
)
payload
,
payload
?
strnlen
(
payload
,
this
->
bufferSize
)
:
0
,
false
);
}
}
boolean
PubSubClient
::
publish
(
const
char
*
topic
,
const
char
*
payload
,
boolean
retained
)
{
boolean
PubSubClient
::
publish
(
const
char
*
topic
,
const
char
*
payload
,
boolean
retained
)
{
return
publish
(
topic
,(
const
uint8_t
*
)
payload
,
payload
?
strnlen
(
payload
,
this
->
bufferSize
)
:
0
,
retained
);
return
publish
(
topic
,
(
const
uint8_t
*
)
payload
,
payload
?
strnlen
(
payload
,
this
->
bufferSize
)
:
0
,
retained
);
}
}
boolean
PubSubClient
::
publish
(
const
char
*
topic
,
const
uint8_t
*
payload
,
unsigned
int
plength
)
{
boolean
PubSubClient
::
publish
(
const
char
*
topic
,
const
uint8_t
*
payload
,
unsigned
int
plength
)
{
return
publish
(
topic
,
payload
,
plength
,
false
);
return
publish
(
topic
,
payload
,
plength
,
false
);
}
}
boolean
PubSubClient
::
publish
(
const
char
*
topic
,
const
uint8_t
*
payload
,
unsigned
int
plength
,
boolean
retained
)
{
boolean
PubSubClient
::
publish
(
const
char
*
topic
,
const
uint8_t
*
payload
,
unsigned
int
plength
,
boolean
retained
)
{
if
(
connected
())
{
if
(
connected
())
{
if
(
this
->
bufferSize
<
MQTT_MAX_HEADER_SIZE
+
2
+
strnlen
(
topic
,
this
->
bufferSize
)
+
plength
)
{
if
(
this
->
bufferSize
<
MQTT_MAX_HEADER_SIZE
+
2
+
strnlen
(
topic
,
this
->
bufferSize
)
+
plength
)
{
// Too long
// Too long
return
false
;
return
false
;
}
}
// Leave room in the buffer for header and variable length field
// Leave room in the buffer for header and variable length field
uint16_t
length
=
MQTT_MAX_HEADER_SIZE
;
uint16_t
length
=
MQTT_MAX_HEADER_SIZE
;
length
=
writeString
(
topic
,
this
->
buffer
,
length
);
length
=
writeString
(
topic
,
this
->
buffer
,
length
);
// Add payload
// Add payload
uint16_t
i
;
uint16_t
i
;
for
(
i
=
0
;
i
<
plength
;
i
++
)
{
for
(
i
=
0
;
i
<
plength
;
i
++
)
{
this
->
buffer
[
length
++
]
=
payload
[
i
];
this
->
buffer
[
length
++
]
=
payload
[
i
];
}
}
// Write the header
// Write the header
uint8_t
header
=
MQTTPUBLISH
;
uint8_t
header
=
MQTTPUBLISH
;
if
(
retained
)
{
if
(
retained
)
{
header
|=
1
;
header
|=
1
;
}
return
write
(
header
,
this
->
buffer
,
length
-
MQTT_MAX_HEADER_SIZE
);
}
}
return
false
;
return
write
(
header
,
this
->
buffer
,
length
-
MQTT_MAX_HEADER_SIZE
);
}
return
false
;
}
}
boolean
PubSubClient
::
publish_P
(
const
char
*
topic
,
const
char
*
payload
,
boolean
retained
)
{
boolean
PubSubClient
::
publish_P
(
const
char
*
topic
,
const
char
*
payload
,
boolean
retained
)
{
return
publish_P
(
topic
,
(
const
uint8_t
*
)
payload
,
payload
?
strnlen
(
payload
,
this
->
bufferSize
)
:
0
,
retained
);
return
publish_P
(
topic
,
(
const
uint8_t
*
)
payload
,
payload
?
strnlen
(
payload
,
this
->
bufferSize
)
:
0
,
retained
);
}
}
boolean
PubSubClient
::
publish_P
(
const
char
*
topic
,
const
uint8_t
*
payload
,
unsigned
int
plength
,
boolean
retained
)
{
boolean
PubSubClient
::
publish_P
(
const
char
*
topic
,
const
uint8_t
*
payload
,
unsigned
int
plength
,
boolean
retained
)
{
uint8_t
llen
=
0
;
uint8_t
llen
=
0
;
uint8_t
digit
;
uint8_t
digit
;
unsigned
int
rc
=
0
;
unsigned
int
rc
=
0
;
uint16_t
tlen
;
uint16_t
tlen
;
unsigned
int
pos
=
0
;
unsigned
int
pos
=
0
;
unsigned
int
i
;
unsigned
int
i
;
uint8_t
header
;
uint8_t
header
;
unsigned
int
len
;
unsigned
int
len
;
in
t
expectedLength
;
uint32_
t
expectedLength
;
if
(
!
connected
())
{
if
(
!
connected
())
{
return
false
;
return
false
;
}
}
tlen
=
strnlen
(
topic
,
this
->
bufferSize
);
tlen
=
strnlen
(
topic
,
this
->
bufferSize
);
header
=
MQTTPUBLISH
;
header
=
MQTTPUBLISH
;
if
(
retained
)
{
if
(
retained
)
{
header
|=
1
;
header
|=
1
;
}
this
->
buffer
[
pos
++
]
=
header
;
len
=
plength
+
2
+
tlen
;
do
{
digit
=
len
&
127
;
//digit = len %128
len
>>=
7
;
//len = len / 128
if
(
len
>
0
)
{
digit
|=
0x80
;
}
}
this
->
buffer
[
pos
++
]
=
header
;
this
->
buffer
[
pos
++
]
=
digit
;
len
=
plength
+
2
+
tlen
;
llen
++
;
do
{
}
while
(
len
>
0
);
digit
=
len
&
127
;
//digit = len %128
len
>>=
7
;
//len = len / 128
if
(
len
>
0
)
{
digit
|=
0x80
;
}
this
->
buffer
[
pos
++
]
=
digit
;
llen
++
;
}
while
(
len
>
0
);
pos
=
writeString
(
topic
,
this
->
buffer
,
pos
);
pos
=
writeString
(
topic
,
this
->
buffer
,
pos
);
rc
+=
_client
->
write
(
this
->
buffer
,
pos
);
rc
+=
_client
->
write
(
this
->
buffer
,
pos
);
for
(
i
=
0
;
i
<
plength
;
i
++
)
{
for
(
i
=
0
;
i
<
plength
;
i
++
)
{
rc
+=
_client
->
write
((
char
)
pgm_read_byte_near
(
payload
+
i
));
rc
+=
_client
->
write
((
char
)
pgm_read_byte_near
(
payload
+
i
));
}
}
lastOutActivity
=
millis
();
lastOutActivity
=
millis
();
expectedLength
=
1
+
llen
+
2
+
tlen
+
plength
;
expectedLength
=
1
+
llen
+
2
+
tlen
+
plength
;
return
(
rc
==
expectedLength
);
return
(
rc
==
expectedLength
);
}
}
boolean
PubSubClient
::
beginPublish
(
const
char
*
topic
,
unsigned
int
plength
,
boolean
retained
)
{
boolean
PubSubClient
::
beginPublish
(
const
char
*
topic
,
unsigned
int
plength
,
boolean
retained
)
{
if
(
connected
())
{
if
(
connected
())
{
// Send the header and variable length field
// Send the header and variable length field
uint16_t
length
=
MQTT_MAX_HEADER_SIZE
;
uint16_t
length
=
MQTT_MAX_HEADER_SIZE
;
length
=
writeString
(
topic
,
this
->
buffer
,
length
);
length
=
writeString
(
topic
,
this
->
buffer
,
length
);
uint8_t
header
=
MQTTPUBLISH
;
uint8_t
header
=
MQTTPUBLISH
;
if
(
retained
)
{
if
(
retained
)
{
header
|=
1
;
header
|=
1
;
}
size_t
hlen
=
buildHeader
(
header
,
this
->
buffer
,
plength
+
length
-
MQTT_MAX_HEADER_SIZE
);
uint16_t
rc
=
_client
->
write
(
this
->
buffer
+
(
MQTT_MAX_HEADER_SIZE
-
hlen
),
length
-
(
MQTT_MAX_HEADER_SIZE
-
hlen
));
lastOutActivity
=
millis
();
return
(
rc
==
(
length
-
(
MQTT_MAX_HEADER_SIZE
-
hlen
)));
}
}
return
false
;
size_t
hlen
=
buildHeader
(
header
,
this
->
buffer
,
plength
+
length
-
MQTT_MAX_HEADER_SIZE
);
uint16_t
rc
=
_client
->
write
(
this
->
buffer
+
(
MQTT_MAX_HEADER_SIZE
-
hlen
),
length
-
(
MQTT_MAX_HEADER_SIZE
-
hlen
));
lastOutActivity
=
millis
();
return
(
rc
==
(
length
-
(
MQTT_MAX_HEADER_SIZE
-
hlen
)));
}
return
false
;
}
}
int
PubSubClient
::
endPublish
()
{
int
PubSubClient
::
endPublish
()
{
return
1
;
return
1
;
}
}
size_t
PubSubClient
::
write
(
uint8_t
data
)
{
size_t
PubSubClient
::
write
(
uint8_t
data
)
{
lastOutActivity
=
millis
();
lastOutActivity
=
millis
();
return
_client
->
write
(
data
);
return
_client
->
write
(
data
);
}
}
size_t
PubSubClient
::
write
(
const
uint8_t
*
buffer
,
size_t
size
)
{
size_t
PubSubClient
::
write
(
const
uint8_t
*
buffer
,
size_t
size
)
{
lastOutActivity
=
millis
();
lastOutActivity
=
millis
();
return
_client
->
write
(
buffer
,
size
);
return
_client
->
write
(
buffer
,
size
);
}
}
size_t
PubSubClient
::
buildHeader
(
uint8_t
header
,
uint8_t
*
buf
,
uint16_t
length
)
{
size_t
PubSubClient
::
buildHeader
(
uint8_t
header
,
uint8_t
*
buf
,
uint16_t
length
)
{
uint8_t
lenBuf
[
4
];
uint8_t
lenBuf
[
4
];
uint8_t
llen
=
0
;
uint8_t
llen
=
0
;
uint8_t
digit
;
uint8_t
digit
;
uint8_t
pos
=
0
;
uint8_t
pos
=
0
;
uint16_t
len
=
length
;
uint16_t
len
=
length
;
do
{
do
{
digit
=
len
&
127
;
//digit = len %128
digit
=
len
&
127
;
//digit = len %128
len
>>=
7
;
//len = len / 128
len
>>=
7
;
//len = len / 128
if
(
len
>
0
)
{
if
(
len
>
0
)
{
digit
|=
0x80
;
digit
|=
0x80
;
}
lenBuf
[
pos
++
]
=
digit
;
llen
++
;
}
while
(
len
>
0
);
buf
[
4
-
llen
]
=
header
;
for
(
int
i
=
0
;
i
<
llen
;
i
++
)
{
buf
[
MQTT_MAX_HEADER_SIZE
-
llen
+
i
]
=
lenBuf
[
i
];
}
}
return
llen
+
1
;
// Full header size is variable length bit plus the 1-byte fixed header
lenBuf
[
pos
++
]
=
digit
;
llen
++
;
}
while
(
len
>
0
);
buf
[
4
-
llen
]
=
header
;
for
(
int
i
=
0
;
i
<
llen
;
i
++
)
{
buf
[
MQTT_MAX_HEADER_SIZE
-
llen
+
i
]
=
lenBuf
[
i
];
}
return
llen
+
1
;
// Full header size is variable length bit plus the 1-byte fixed header
}
}
boolean
PubSubClient
::
write
(
uint8_t
header
,
uint8_t
*
buf
,
uint16_t
length
)
{
boolean
PubSubClient
::
write
(
uint8_t
header
,
uint8_t
*
buf
,
uint16_t
length
)
{
uint16_t
rc
;
uint16_t
rc
;
uint8_t
hlen
=
buildHeader
(
header
,
buf
,
length
);
uint8_t
hlen
=
buildHeader
(
header
,
buf
,
length
);
#ifdef MQTT_MAX_TRANSFER_SIZE
#ifdef MQTT_MAX_TRANSFER_SIZE
uint8_t
*
writeBuf
=
buf
+
(
MQTT_MAX_HEADER_SIZE
-
hlen
);
uint8_t
*
writeBuf
=
buf
+
(
MQTT_MAX_HEADER_SIZE
-
hlen
);
...
@@ -596,174 +603,173 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
...
@@ -596,174 +603,173 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
}
}
return
result
;
return
result
;
#else
#else
rc
=
_client
->
write
(
buf
+
(
MQTT_MAX_HEADER_SIZE
-
hlen
),
length
+
hlen
);
rc
=
_client
->
write
(
buf
+
(
MQTT_MAX_HEADER_SIZE
-
hlen
),
length
+
hlen
);
lastOutActivity
=
millis
();
lastOutActivity
=
millis
();
return
(
rc
==
hlen
+
length
);
return
(
rc
==
hlen
+
length
);
#endif
#endif
}
}
boolean
PubSubClient
::
subscribe
(
const
char
*
topic
)
{
boolean
PubSubClient
::
subscribe
(
const
char
*
topic
)
{
return
subscribe
(
topic
,
0
);
return
subscribe
(
topic
,
0
);
}
}
boolean
PubSubClient
::
subscribe
(
const
char
*
topic
,
uint8_t
qos
)
{
boolean
PubSubClient
::
subscribe
(
const
char
*
topic
,
uint8_t
qos
)
{
size_t
topicLength
=
strnlen
(
topic
,
this
->
bufferSize
);
size_t
topicLength
=
strnlen
(
topic
,
this
->
bufferSize
);
if
(
topic
==
0
)
{
if
(
topic
==
0
)
{
return
false
;
return
false
;
}
}
if
(
qos
>
1
)
{
if
(
qos
>
1
)
{
return
false
;
}
if
(
this
->
bufferSize
<
9
+
topicLength
)
{
// Too long
return
false
;
}
if
(
connected
())
{
// Leave room in the buffer for header and variable length field
uint16_t
length
=
MQTT_MAX_HEADER_SIZE
;
nextMsgId
++
;
if
(
nextMsgId
==
0
)
{
nextMsgId
=
1
;
}
this
->
buffer
[
length
++
]
=
(
nextMsgId
>>
8
);
this
->
buffer
[
length
++
]
=
(
nextMsgId
&
0xFF
);
length
=
writeString
((
char
*
)
topic
,
this
->
buffer
,
length
);
this
->
buffer
[
length
++
]
=
qos
;
return
write
(
MQTTSUBSCRIBE
|
MQTTQOS1
,
this
->
buffer
,
length
-
MQTT_MAX_HEADER_SIZE
);
}
return
false
;
return
false
;
}
if
(
this
->
bufferSize
<
9
+
topicLength
)
{
// Too long
return
false
;
}
if
(
connected
())
{
// Leave room in the buffer for header and variable length field
uint16_t
length
=
MQTT_MAX_HEADER_SIZE
;
nextMsgId
++
;
if
(
nextMsgId
==
0
)
{
nextMsgId
=
1
;
}
this
->
buffer
[
length
++
]
=
(
nextMsgId
>>
8
);
this
->
buffer
[
length
++
]
=
(
nextMsgId
&
0xFF
);
length
=
writeString
((
char
*
)
topic
,
this
->
buffer
,
length
);
this
->
buffer
[
length
++
]
=
qos
;
return
write
(
MQTTSUBSCRIBE
|
MQTTQOS1
,
this
->
buffer
,
length
-
MQTT_MAX_HEADER_SIZE
);
}
return
false
;
}
}
boolean
PubSubClient
::
unsubscribe
(
const
char
*
topic
)
{
boolean
PubSubClient
::
unsubscribe
(
const
char
*
topic
)
{
size_t
topicLength
=
strnlen
(
topic
,
this
->
bufferSize
);
size_t
topicLength
=
strnlen
(
topic
,
this
->
bufferSize
);
if
(
topic
==
0
)
{
if
(
topic
==
0
)
{
return
false
;
}
if
(
this
->
bufferSize
<
9
+
topicLength
)
{
// Too long
return
false
;
}
if
(
connected
())
{
uint16_t
length
=
MQTT_MAX_HEADER_SIZE
;
nextMsgId
++
;
if
(
nextMsgId
==
0
)
{
nextMsgId
=
1
;
}
this
->
buffer
[
length
++
]
=
(
nextMsgId
>>
8
);
this
->
buffer
[
length
++
]
=
(
nextMsgId
&
0xFF
);
length
=
writeString
(
topic
,
this
->
buffer
,
length
);
return
write
(
MQTTUNSUBSCRIBE
|
MQTTQOS1
,
this
->
buffer
,
length
-
MQTT_MAX_HEADER_SIZE
);
}
return
false
;
return
false
;
}
if
(
this
->
bufferSize
<
9
+
topicLength
)
{
// Too long
return
false
;
}
if
(
connected
())
{
uint16_t
length
=
MQTT_MAX_HEADER_SIZE
;
nextMsgId
++
;
if
(
nextMsgId
==
0
)
{
nextMsgId
=
1
;
}
this
->
buffer
[
length
++
]
=
(
nextMsgId
>>
8
);
this
->
buffer
[
length
++
]
=
(
nextMsgId
&
0xFF
);
length
=
writeString
(
topic
,
this
->
buffer
,
length
);
return
write
(
MQTTUNSUBSCRIBE
|
MQTTQOS1
,
this
->
buffer
,
length
-
MQTT_MAX_HEADER_SIZE
);
}
return
false
;
}
}
void
PubSubClient
::
disconnect
()
{
void
PubSubClient
::
disconnect
()
{
this
->
buffer
[
0
]
=
MQTTDISCONNECT
;
this
->
buffer
[
0
]
=
MQTTDISCONNECT
;
this
->
buffer
[
1
]
=
0
;
this
->
buffer
[
1
]
=
0
;
_client
->
write
(
this
->
buffer
,
2
);
_client
->
write
(
this
->
buffer
,
2
);
_state
=
MQTT_DISCONNECTED
;
_state
=
MQTT_DISCONNECTED
;
_client
->
flush
();
_client
->
flush
();
_client
->
stop
();
_client
->
stop
();
lastInActivity
=
lastOutActivity
=
millis
();
lastInActivity
=
lastOutActivity
=
millis
();
}
}
uint16_t
PubSubClient
::
writeString
(
const
char
*
string
,
uint8_t
*
buf
,
uint16_t
pos
)
{
uint16_t
PubSubClient
::
writeString
(
const
char
*
string
,
uint8_t
*
buf
,
uint16_t
pos
)
{
const
char
*
idp
=
string
;
const
char
*
idp
=
string
;
uint16_t
i
=
0
;
uint16_t
i
=
0
;
pos
+=
2
;
pos
+=
2
;
while
(
*
idp
)
{
while
(
*
idp
)
{
buf
[
pos
++
]
=
*
idp
++
;
buf
[
pos
++
]
=
*
idp
++
;
i
++
;
i
++
;
}
}
buf
[
pos
-
i
-
2
]
=
(
i
>>
8
);
buf
[
pos
-
i
-
2
]
=
(
i
>>
8
);
buf
[
pos
-
i
-
1
]
=
(
i
&
0xFF
);
buf
[
pos
-
i
-
1
]
=
(
i
&
0xFF
);
return
pos
;
return
pos
;
}
}
boolean
PubSubClient
::
connected
()
{
boolean
PubSubClient
::
connected
()
{
boolean
rc
;
boolean
rc
;
if
(
_client
==
NULL
)
{
if
(
_client
==
NULL
)
{
rc
=
false
;
rc
=
false
;
}
else
{
rc
=
(
int
)
_client
->
connected
();
if
(
!
rc
)
{
if
(
this
->
_state
==
MQTT_CONNECTED
)
{
this
->
_state
=
MQTT_CONNECTION_LOST
;
_client
->
flush
();
_client
->
stop
();
}
}
else
{
}
else
{
rc
=
(
int
)
_client
->
connected
();
return
this
->
_state
==
MQTT_CONNECTED
;
if
(
!
rc
)
{
if
(
this
->
_state
==
MQTT_CONNECTED
)
{
this
->
_state
=
MQTT_CONNECTION_LOST
;
_client
->
flush
();
_client
->
stop
();
}
}
else
{
return
this
->
_state
==
MQTT_CONNECTED
;
}
}
}
return
rc
;
}
return
rc
;
}
}
PubSubClient
&
PubSubClient
::
setServer
(
uint8_t
*
ip
,
uint16_t
port
)
{
PubSubClient
&
PubSubClient
::
setServer
(
uint8_t
*
ip
,
uint16_t
port
)
{
IPAddress
addr
(
ip
[
0
],
ip
[
1
],
ip
[
2
],
ip
[
3
]);
IPAddress
addr
(
ip
[
0
],
ip
[
1
],
ip
[
2
],
ip
[
3
]);
return
setServer
(
addr
,
port
);
return
setServer
(
addr
,
port
);
}
}
PubSubClient
&
PubSubClient
::
setServer
(
IPAddress
ip
,
uint16_t
port
)
{
PubSubClient
&
PubSubClient
::
setServer
(
IPAddress
ip
,
uint16_t
port
)
{
this
->
ip
=
ip
;
this
->
ip
=
ip
;
this
->
port
=
port
;
this
->
port
=
port
;
this
->
domain
=
NULL
;
this
->
domain
=
NULL
;
return
*
this
;
return
*
this
;
}
}
PubSubClient
&
PubSubClient
::
setServer
(
const
char
*
domain
,
uint16_t
port
)
{
PubSubClient
&
PubSubClient
::
setServer
(
const
char
*
domain
,
uint16_t
port
)
{
this
->
domain
=
domain
;
this
->
domain
=
domain
;
this
->
port
=
port
;
this
->
port
=
port
;
return
*
this
;
return
*
this
;
}
}
PubSubClient
&
PubSubClient
::
setCallback
(
MQTT_CALLBACK_SIGNATURE
)
{
PubSubClient
&
PubSubClient
::
setCallback
(
MQTT_CALLBACK_SIGNATURE
)
{
this
->
callback
=
callback
;
this
->
callback
=
callback
;
return
*
this
;
return
*
this
;
}
}
PubSubClient
&
PubSubClient
::
setClient
(
Client
&
client
){
PubSubClient
&
PubSubClient
::
setClient
(
Client
&
client
)
{
this
->
_client
=
&
client
;
this
->
_client
=
&
client
;
return
*
this
;
return
*
this
;
}
}
PubSubClient
&
PubSubClient
::
setStream
(
Stream
&
stream
){
PubSubClient
&
PubSubClient
::
setStream
(
Stream
&
stream
)
{
this
->
stream
=
&
stream
;
this
->
stream
=
&
stream
;
return
*
this
;
return
*
this
;
}
}
int
PubSubClient
::
state
()
{
int
PubSubClient
::
state
()
{
return
this
->
_state
;
return
this
->
_state
;
}
}
boolean
PubSubClient
::
setBufferSize
(
uint16_t
size
)
{
boolean
PubSubClient
::
setBufferSize
(
uint16_t
size
)
{
if
(
size
==
0
)
{
if
(
size
==
0
)
{
// Cannot set it back to 0
// Cannot set it back to 0
return
false
;
return
false
;
}
}
if
(
this
->
bufferSize
==
0
)
{
if
(
this
->
bufferSize
==
0
)
{
this
->
buffer
=
(
uint8_t
*
)
malloc
(
size
);
this
->
buffer
=
(
uint8_t
*
)
malloc
(
size
);
}
else
{
uint8_t
*
newBuffer
=
(
uint8_t
*
)
realloc
(
this
->
buffer
,
size
);
if
(
newBuffer
!=
NULL
)
{
this
->
buffer
=
newBuffer
;
}
else
{
}
else
{
uint8_t
*
newBuffer
=
(
uint8_t
*
)
realloc
(
this
->
buffer
,
size
);
return
false
;
if
(
newBuffer
!=
NULL
)
{
this
->
buffer
=
newBuffer
;
}
else
{
return
false
;
}
}
}
this
->
bufferSize
=
size
;
}
return
(
this
->
buffer
!=
NULL
);
this
->
bufferSize
=
size
;
return
(
this
->
buffer
!=
NULL
);
}
}
uint16_t
PubSubClient
::
getBufferSize
()
{
uint16_t
PubSubClient
::
getBufferSize
()
{
return
this
->
bufferSize
;
return
this
->
bufferSize
;
}
}
PubSubClient
&
PubSubClient
::
setKeepAlive
(
uint16_t
keepAlive
)
{
PubSubClient
&
PubSubClient
::
setKeepAlive
(
uint16_t
keepAlive
)
{
this
->
keepAlive
=
keepAlive
;
this
->
keepAlive
=
keepAlive
;
return
*
this
;
return
*
this
;
}
}
PubSubClient
&
PubSubClient
::
setSocketTimeout
(
uint16_t
timeout
)
{
PubSubClient
&
PubSubClient
::
setSocketTimeout
(
uint16_t
timeout
)
{
this
->
socketTimeout
=
timeout
;
this
->
socketTimeout
=
timeout
;
return
*
this
;
return
*
this
;
}
}
This diff is collapsed.
Click to expand it.
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment
Menu
Explore
Projects
Groups
Snippets