circular_queue.h 13.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
/*
circular_queue.h - Implementation of a lock-free circular queue for EspSoftwareSerial.
Copyright (c) 2019 Dirk O. Kaar. All rights reserved.

This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.

This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
*/

#ifndef __circular_queue_h
#define __circular_queue_h

#ifdef ARDUINO
#include <Arduino.h>
#endif

#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
#include <atomic>
#include <memory>
#include <algorithm>
#include "Delegate.h"
using std::min;
#else
#include "ghostl.h"
#endif

#if !defined(ESP32) && !defined(ESP8266)
#define IRAM_ATTR
#endif

/*!
    @brief	Instance class for a single-producer, single-consumer circular queue / ring buffer (FIFO).
            This implementation is lock-free between producer and consumer for the available(), peek(),
            pop(), and push() type functions.
*/
template< typename T, typename ForEachArg = void >
class circular_queue
{
public:
    /*!
        @brief	Constructs a valid, but zero-capacity dummy queue.
    */
    circular_queue() : m_bufSize(1)
    {
        m_inPos.store(0);
        m_outPos.store(0);
    }
    /*!
        @brief  Constructs a queue of the given maximum capacity.
    */
    circular_queue(const size_t capacity) : m_bufSize(capacity + 1), m_buffer(new T[m_bufSize])
    {
        m_inPos.store(0);
        m_outPos.store(0);
    }
    circular_queue(circular_queue&& cq) :
        m_bufSize(cq.m_bufSize), m_buffer(cq.m_buffer), m_inPos(cq.m_inPos.load()), m_outPos(cq.m_outPos.load())
    {}
    ~circular_queue()
    {
        m_buffer.reset();
    }
    circular_queue(const circular_queue&) = delete;
    circular_queue& operator=(circular_queue&& cq)
    {
        m_bufSize = cq.m_bufSize;
        m_buffer = cq.m_buffer;
        m_inPos.store(cq.m_inPos.load());
        m_outPos.store(cq.m_outPos.load());
    }
    circular_queue& operator=(const circular_queue&) = delete;

    /*!
        @brief	Get the numer of elements the queue can hold at most.
    */
    size_t capacity() const
    {
        return m_bufSize - 1;
    }

    /*!
        @brief	Resize the queue. The available elements in the queue are preserved.
                This is not lock-free and concurrent producer or consumer access
                will lead to corruption.
        @return True if the new capacity could accommodate the present elements in
                the queue, otherwise nothing is done and false is returned.
    */
    bool capacity(const size_t cap);

    /*!
        @brief	Discard all data in the queue.
    */
    void flush()
    {
        m_outPos.store(m_inPos.load());
    }

    /*!
        @brief	Get a snapshot number of elements that can be retrieved by pop.
    */
    size_t available() const
    {
        int avail = static_cast<int>(m_inPos.load() - m_outPos.load());
        if (avail < 0) avail += m_bufSize;
        return avail;
    }

    /*!
        @brief	Get the remaining free elementes for pushing.
    */
    size_t available_for_push() const
    {
        int avail = static_cast<int>(m_outPos.load() - m_inPos.load()) - 1;
        if (avail < 0) avail += m_bufSize;
        return avail;
    }

    /*!
        @brief	Peek at the next element pop will return without removing it from the queue.
        @return An rvalue copy of the next element that can be popped. If the queue is empty,
                return an rvalue copy of the element that is pending the next push.
    */
    T peek() const
    {
        const auto outPos = m_outPos.load(std::memory_order_relaxed);
        std::atomic_thread_fence(std::memory_order_acquire);
        return m_buffer[outPos];
    }

    /*!
        @brief	Peek at the next pending input value.
        @return A reference to the next element that can be pushed.
    */
    inline T& IRAM_ATTR pushpeek() __attribute__((always_inline))
    {
        const auto inPos = m_inPos.load(std::memory_order_relaxed);
        std::atomic_thread_fence(std::memory_order_acquire);
        return m_buffer[inPos];
    }

    /*!
        @brief	Release the next pending input value, accessible by pushpeek(), into the queue.
        @return true if the queue accepted the value, false if the queue
                was full.
    */
    inline bool IRAM_ATTR push() __attribute__((always_inline))
    {
        const auto inPos = m_inPos.load(std::memory_order_acquire);
        const size_t next = (inPos + 1) % m_bufSize;
        if (next == m_outPos.load(std::memory_order_relaxed)) {
            return false;
        }
    
        std::atomic_thread_fence(std::memory_order_acquire);
    
        m_inPos.store(next, std::memory_order_release);
        return true;
    }

    /*!
        @brief	Move the rvalue parameter into the queue.
        @return true if the queue accepted the value, false if the queue
                was full.
    */
    inline bool IRAM_ATTR push(T&& val) __attribute__((always_inline))
    {
        const auto inPos = m_inPos.load(std::memory_order_acquire);
        const size_t next = (inPos + 1) % m_bufSize;
        if (next == m_outPos.load(std::memory_order_relaxed)) {
            return false;
        }
    
        std::atomic_thread_fence(std::memory_order_acquire);
    
        m_buffer[inPos] = std::move(val);
    
        std::atomic_thread_fence(std::memory_order_release);
    
        m_inPos.store(next, std::memory_order_release);
        return true;
    }

    /*!
        @brief	Push a copy of the parameter into the queue.
        @return true if the queue accepted the value, false if the queue
                was full.
    */
    inline bool IRAM_ATTR push(const T& val) __attribute__((always_inline))
    {
        T v(val);
        return push(std::move(v));
    }

#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
    /*!
        @brief	Push copies of multiple elements from a buffer into the queue,
                in order, beginning at buffer's head.
        @return The number of elements actually copied into the queue, counted
                from the buffer head.
    */
    size_t push_n(const T* buffer, size_t size);
#endif

    /*!
        @brief	Pop the next available element from the queue.
        @return An rvalue copy of the popped element, or a default
                value of type T if the queue is empty.
    */
    T pop();

#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
    /*!
        @brief	Pop multiple elements in ordered sequence from the queue to a buffer.
                If buffer is nullptr, simply discards up to size elements from the queue.
        @return The number of elements actually popped from the queue to
                buffer.
    */
    size_t pop_n(T* buffer, size_t size);
#endif

    /*!
        @brief	Iterate over and remove each available element from queue,
                calling back fun with an rvalue reference of every single element.
    */
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
    void for_each(const Delegate<void(T&&), ForEachArg>& fun);
#else
    void for_each(Delegate<void(T&&), ForEachArg> fun);
#endif

    /*!
        @brief	In reverse order, iterate over, pop and optionally requeue each available element from the queue,
                calling back fun with a reference of every single element.
                Requeuing is dependent on the return boolean of the callback function. If it
                returns true, the requeue occurs.
    */
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
    bool for_each_rev_requeue(const Delegate<bool(T&), ForEachArg>& fun);
#else
    bool for_each_rev_requeue(Delegate<bool(T&), ForEachArg> fun);
#endif

protected:
    const T defaultValue = {};
    size_t m_bufSize;
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
    std::unique_ptr<T[]> m_buffer;
#else
    std::unique_ptr<T> m_buffer;
#endif
    std::atomic<size_t> m_inPos;
    std::atomic<size_t> m_outPos;
};

template< typename T, typename ForEachArg >
bool circular_queue<T, ForEachArg>::capacity(const size_t cap)
{
    if (cap + 1 == m_bufSize) return true;
    else if (available() > cap) return false;
    std::unique_ptr<T[] > buffer(new T[cap + 1]);
    const auto available = pop_n(buffer, cap);
    m_buffer.reset(buffer);
    m_bufSize = cap + 1;
    std::atomic_thread_fence(std::memory_order_release);
    m_inPos.store(available, std::memory_order_relaxed);
    m_outPos.store(0, std::memory_order_release);
    return true;
}

#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
template< typename T, typename ForEachArg >
size_t circular_queue<T, ForEachArg>::push_n(const T* buffer, size_t size)
{
    const auto inPos = m_inPos.load(std::memory_order_acquire);
    const auto outPos = m_outPos.load(std::memory_order_relaxed);

    size_t blockSize = (outPos > inPos) ? outPos - 1 - inPos : (outPos == 0) ? m_bufSize - 1 - inPos : m_bufSize - inPos;
    blockSize = min(size, blockSize);
    if (!blockSize) return 0;
    int next = (inPos + blockSize) % m_bufSize;

    std::atomic_thread_fence(std::memory_order_acquire);

    auto dest = m_buffer.get() + inPos;
    std::copy_n(std::make_move_iterator(buffer), blockSize, dest);
    size = min(size - blockSize, outPos > 1 ? static_cast<size_t>(outPos - next - 1) : 0);
    next += size;
    dest = m_buffer.get();
    std::copy_n(std::make_move_iterator(buffer + blockSize), size, dest);

    std::atomic_thread_fence(std::memory_order_release);

    m_inPos.store(next, std::memory_order_release);
    return blockSize + size;
}
#endif

template< typename T, typename ForEachArg >
T circular_queue<T, ForEachArg>::pop()
{
    const auto outPos = m_outPos.load(std::memory_order_acquire);
    if (m_inPos.load(std::memory_order_relaxed) == outPos) return defaultValue;

    std::atomic_thread_fence(std::memory_order_acquire);

    auto val = std::move(m_buffer[outPos]);

    std::atomic_thread_fence(std::memory_order_release);

    m_outPos.store((outPos + 1) % m_bufSize, std::memory_order_release);
    return val;
}

#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
template< typename T, typename ForEachArg >
size_t circular_queue<T, ForEachArg>::pop_n(T* buffer, size_t size) {
    size_t avail = size = min(size, available());
    if (!avail) return 0;
    const auto outPos = m_outPos.load(std::memory_order_acquire);
    size_t n = min(avail, static_cast<size_t>(m_bufSize - outPos));

    std::atomic_thread_fence(std::memory_order_acquire);

    if (buffer) {
        buffer = std::copy_n(std::make_move_iterator(m_buffer.get() + outPos), n, buffer);
        avail -= n;
        std::copy_n(std::make_move_iterator(m_buffer.get()), avail, buffer);
    }

    std::atomic_thread_fence(std::memory_order_release);

    m_outPos.store((outPos + size) % m_bufSize, std::memory_order_release);
    return size;
}
#endif

template< typename T, typename ForEachArg >
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
void circular_queue<T, ForEachArg>::for_each(const Delegate<void(T&&), ForEachArg>& fun)
#else
void circular_queue<T, ForEachArg>::for_each(Delegate<void(T&&), ForEachArg> fun)
#endif
{
    auto outPos = m_outPos.load(std::memory_order_acquire);
    const auto inPos = m_inPos.load(std::memory_order_relaxed);
    std::atomic_thread_fence(std::memory_order_acquire);
    while (outPos != inPos)
    {
        fun(std::move(m_buffer[outPos]));
        std::atomic_thread_fence(std::memory_order_release);
        outPos = (outPos + 1) % m_bufSize;
        m_outPos.store(outPos, std::memory_order_release);
    }
}

template< typename T, typename ForEachArg >
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
bool circular_queue<T, ForEachArg>::for_each_rev_requeue(const Delegate<bool(T&), ForEachArg>& fun)
#else
bool circular_queue<T, ForEachArg>::for_each_rev_requeue(Delegate<bool(T&), ForEachArg> fun)
#endif
{
    auto inPos0 = circular_queue<T, ForEachArg>::m_inPos.load(std::memory_order_acquire);
    auto outPos = circular_queue<T, ForEachArg>::m_outPos.load(std::memory_order_relaxed);
    std::atomic_thread_fence(std::memory_order_acquire);
    if (outPos == inPos0) return false;
    auto pos = inPos0;
    auto outPos1 = inPos0;
    const auto posDecr = circular_queue<T, ForEachArg>::m_bufSize - 1;
    do {
        pos = (pos + posDecr) % circular_queue<T, ForEachArg>::m_bufSize;
        T&& val = std::move(circular_queue<T, ForEachArg>::m_buffer[pos]);
        if (fun(val))
        {
            outPos1 = (outPos1 + posDecr) % circular_queue<T, ForEachArg>::m_bufSize;
            if (outPos1 != pos) circular_queue<T, ForEachArg>::m_buffer[outPos1] = std::move(val);
        }
    } while (pos != outPos);
    circular_queue<T, ForEachArg>::m_outPos.store(outPos1, std::memory_order_release);
    return true;
}

#endif // __circular_queue_h