circular_queue_mp.h 7.45 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/*
circular_queue_mp.h - Implementation of a lock-free circular queue for EspSoftwareSerial.
Copyright (c) 2019 Dirk O. Kaar. All rights reserved.

This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.

This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
*/

#ifndef __circular_queue_mp_h
#define __circular_queue_mp_h

#include "circular_queue.h"

#ifdef ESP8266
#include "interrupts.h"
#else
#include <mutex>
#endif

/*!
    @brief	Instance class for a multi-producer, single-consumer circular queue / ring buffer (FIFO).
            This implementation is lock-free between producers and consumer for the available(), peek(),
            pop(), and push() type functions, but is guarded to safely allow only a single producer
            at any instant.
*/
template< typename T, typename ForEachArg = void >
class circular_queue_mp : protected circular_queue<T, ForEachArg>
{
public:
    circular_queue_mp() = default;
    circular_queue_mp(const size_t capacity) : circular_queue<T, ForEachArg>(capacity)
    {}
    circular_queue_mp(circular_queue<T, ForEachArg>&& cq) : circular_queue<T, ForEachArg>(std::move(cq))
    {}
    using circular_queue<T, ForEachArg>::operator=;
    using circular_queue<T, ForEachArg>::capacity;
    using circular_queue<T, ForEachArg>::flush;
    using circular_queue<T, ForEachArg>::available;
    using circular_queue<T, ForEachArg>::available_for_push;
    using circular_queue<T, ForEachArg>::peek;
    using circular_queue<T, ForEachArg>::pop;
    using circular_queue<T, ForEachArg>::pop_n;
    using circular_queue<T, ForEachArg>::for_each;
    using circular_queue<T, ForEachArg>::for_each_rev_requeue;

    /*!
        @brief	Resize the queue. The available elements in the queue are preserved.
                This is not lock-free, but safe, concurrent producer or consumer access
                is guarded.
        @return True if the new capacity could accommodate the present elements in
                the queue, otherwise nothing is done and false is returned.
    */
    bool capacity(const size_t cap)
    {
#ifdef ESP8266
        esp8266::InterruptLock lock;
#else
        std::lock_guard<std::mutex> lock(m_pushMtx);
#endif
        return circular_queue<T, ForEachArg>::capacity(cap);
    }

    bool IRAM_ATTR push() = delete;

    /*!
        @brief	Move the rvalue parameter into the queue, guarded
                for multiple concurrent producers.
        @return true if the queue accepted the value, false if the queue
                was full.
    */
    bool IRAM_ATTR push(T&& val)
    {
#ifdef ESP8266
        esp8266::InterruptLock lock;
#else
        std::lock_guard<std::mutex> lock(m_pushMtx);
#endif
        return circular_queue<T, ForEachArg>::push(std::move(val));
    }

    /*!
        @brief	Push a copy of the parameter into the queue, guarded
                for multiple concurrent producers.
        @return true if the queue accepted the value, false if the queue
                was full.
    */
    bool IRAM_ATTR push(const T& val)
    {
#ifdef ESP8266
        esp8266::InterruptLock lock;
#else
        std::lock_guard<std::mutex> lock(m_pushMtx);
#endif
        return circular_queue<T, ForEachArg>::push(val);
    }

    /*!
        @brief	Push copies of multiple elements from a buffer into the queue,
                in order, beginning at buffer's head. This is guarded for
                multiple producers, push_n() is atomic.
        @return The number of elements actually copied into the queue, counted
                from the buffer head.
    */
    size_t push_n(const T* buffer, size_t size)
    {
#ifdef ESP8266
        esp8266::InterruptLock lock;
#else
        std::lock_guard<std::mutex> lock(m_pushMtx);
#endif
        return circular_queue<T, ForEachArg>::push_n(buffer, size);
    }

    /*!
        @brief	Pops the next available element from the queue, requeues
                it immediately.
        @return A reference to the just requeued element, or the default
                value of type T if the queue is empty.
    */
    T& pop_requeue();

    /*!
        @brief	Iterate over, pop and optionally requeue each available element from the queue,
                calling back fun with a reference of every single element.
                Requeuing is dependent on the return boolean of the callback function. If it
                returns true, the requeue occurs.
    */
    bool for_each_requeue(const Delegate<bool(T&), ForEachArg>& fun);

#ifndef ESP8266
protected:
    std::mutex m_pushMtx;
#endif
};

template< typename T, typename ForEachArg >
T& circular_queue_mp<T, ForEachArg>::pop_requeue()
{
#ifdef ESP8266
    esp8266::InterruptLock lock;
#else
    std::lock_guard<std::mutex> lock(m_pushMtx);
#endif
    const auto outPos = circular_queue<T, ForEachArg>::m_outPos.load(std::memory_order_acquire);
    const auto inPos = circular_queue<T, ForEachArg>::m_inPos.load(std::memory_order_relaxed);
    std::atomic_thread_fence(std::memory_order_acquire);
    if (inPos == outPos) return circular_queue<T, ForEachArg>::defaultValue;
    T& val = circular_queue<T, ForEachArg>::m_buffer[inPos] = std::move(circular_queue<T, ForEachArg>::m_buffer[outPos]);
    const auto bufSize = circular_queue<T, ForEachArg>::m_bufSize;
    std::atomic_thread_fence(std::memory_order_release);
	circular_queue<T, ForEachArg>::m_outPos.store((outPos + 1) % bufSize, std::memory_order_relaxed);
	circular_queue<T, ForEachArg>::m_inPos.store((inPos + 1) % bufSize, std::memory_order_release);
    return val;
}

template< typename T, typename ForEachArg >
bool circular_queue_mp<T, ForEachArg>::for_each_requeue(const Delegate<bool(T&), ForEachArg>& fun)
{
    auto inPos0 = circular_queue<T, ForEachArg>::m_inPos.load(std::memory_order_acquire);
    auto outPos = circular_queue<T, ForEachArg>::m_outPos.load(std::memory_order_relaxed);
    std::atomic_thread_fence(std::memory_order_acquire);
    if (outPos == inPos0) return false;
    do {
        T&& val = std::move(circular_queue<T, ForEachArg>::m_buffer[outPos]);
        if (fun(val))
        {
#ifdef ESP8266
            esp8266::InterruptLock lock;
#else
            std::lock_guard<std::mutex> lock(m_pushMtx);
#endif
            std::atomic_thread_fence(std::memory_order_release);
            auto inPos = circular_queue<T, ForEachArg>::m_inPos.load(std::memory_order_relaxed);
            std::atomic_thread_fence(std::memory_order_acquire);
			circular_queue<T, ForEachArg>::m_buffer[inPos] = std::move(val);
            std::atomic_thread_fence(std::memory_order_release);
			circular_queue<T, ForEachArg>::m_inPos.store((inPos + 1) % circular_queue<T, ForEachArg>::m_bufSize, std::memory_order_release);
        }
        else
        {
            std::atomic_thread_fence(std::memory_order_release);
        }
        outPos = (outPos + 1) % circular_queue<T, ForEachArg>::m_bufSize;
		circular_queue<T, ForEachArg>::m_outPos.store(outPos, std::memory_order_release);
    } while (outPos != inPos0);
    return true;
}

#endif // __circular_queue_mp_h