2
0
mirror of https://github.com/openvswitch/ovs synced 2025-08-22 01:51:26 +00:00
ovs/lib/mpsc-queue.c
Gaetan Rivet 5396ba5b21 mpsc-queue: Module for lock-free message passing.
Add a lockless multi-producer/single-consumer (MPSC), linked-list based,
intrusive, unbounded queue that does not require deferred memory
management.

The queue is designed to improve the specific MPSC setup.  A benchmark
accompanies the unit tests to measure the difference in this configuration.
A single reader thread polls the queue while N writers enqueue elements
as fast as possible.  The mpsc-queue is compared against the regular ovs-list
as well as the guarded list.  The latter usually offers a slight improvement
by batching the element removal, however the mpsc-queue is faster.

The average is of each producer threads time:

   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 1
   Benchmarking n=3000000 on 1 + 1 threads.
    type\thread:  Reader      1    Avg
     mpsc-queue:     167    167    167 ms
     list(spin):      89     80     80 ms
    list(mutex):     745    745    745 ms
   guarded list:     788    788    788 ms

   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 2
   Benchmarking n=3000000 on 1 + 2 threads.
    type\thread:  Reader      1      2    Avg
     mpsc-queue:      98     97     94     95 ms
     list(spin):     185    171    173    172 ms
    list(mutex):     203    199    203    201 ms
   guarded list:     269    269    188    228 ms

   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 3
   Benchmarking n=3000000 on 1 + 3 threads.
    type\thread:  Reader      1      2      3    Avg
     mpsc-queue:      76     76     65     76     72 ms
     list(spin):     246    110    240    238    196 ms
    list(mutex):     542    541    541    539    540 ms
   guarded list:     535    535    507    511    517 ms

   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 4
   Benchmarking n=3000000 on 1 + 4 threads.
    type\thread:  Reader      1      2      3      4    Avg
     mpsc-queue:      73     68     68     68     68     68 ms
     list(spin):     294    275    279    277    282    278 ms
    list(mutex):     346    309    287    345    302    310 ms
   guarded list:     378    319    334    378    351    345 ms

Signed-off-by: Gaetan Rivet <grive@u256.net>
Reviewed-by: Eli Britstein <elibr@nvidia.com>
Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>
Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
2022-01-18 19:30:17 +01:00

252 lines
8.1 KiB
C

/*
* Copyright (c) 2021 NVIDIA Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <config.h>
#include "ovs-atomic.h"
#include "mpsc-queue.h"
/* Multi-producer, single-consumer queue
* =====================================
*
* This an implementation of the MPSC queue described by Dmitri Vyukov [1].
*
* One atomic exchange operation is done per insertion. Removal in most cases
* will not require atomic operation and will use one atomic exchange to close
* the queue chain.
*
* Insertion
* =========
*
* The queue is implemented using a linked-list. Insertion is done at the
* back of the queue, by swapping the current end with the new node atomically,
* then pointing the previous end toward the new node. To follow Vyukov
* nomenclature, the end-node of the chain is called head. A producer will
* only manipulate the head.
*
* The head swap is atomic, however the link from the previous head to the new
* one is done in a separate operation. This means that the chain is
* momentarily broken, when the previous head still points to NULL and the
* current head has been inserted.
*
* Considering a series of insertions, the queue state will remain consistent
* and the insertions order is compatible with their precedence, thus the
* queue is serializable. However, because an insertion consists in two
* separate memory transactions, it is not linearizable.
*
* Removal
* =======
*
* The consumer must deal with the queue inconsistency. It will manipulate
* the tail of the queue and move it along the latest consumed elements.
* When an end of the chain of elements is found (the next pointer is NULL),
* the tail is compared with the head.
*
* If both points to different addresses, then the queue is in an inconsistent
* state: the tail cannot move forward as the next is NULL, but the head is not
* the last element in the chain: this can only happen if the chain is broken.
*
* In this case, the consumer must wait for the producer to finish writing the
* next pointer of its current tail: 'MPSC_QUEUE_RETRY' is returned.
*
* Removal is thus in most cases (when there are elements in the queue)
* accomplished without using atomics, until the last element of the queue.
* There, the head is atomically loaded. If the queue is in a consistent state,
* the head is moved back to the queue stub by inserting the stub in the queue:
* ending the queue is the same as an insertion, which is one atomic XCHG.
*
* Forward guarantees
* ==================
*
* Insertion and peeking are wait-free: they will execute in a known bounded
* number of instructions, regardless of the state of the queue.
*
* However, while removal consists in peeking and a constant write to
* update the tail, it can repeatedly fail until the queue become consistent.
* It is thus dependent on other threads progressing. This means that the
* queue forward progress is obstruction-free only. It has a potential for
* livelocking.
*
* The chain will remain broken as long as a producer is not finished writing
* its next pointer. If a producer is cancelled for example, the queue could
* remain broken for any future readings. This queue should either be used
* with cooperative threads or insertion must only be done outside cancellable
* sections.
*
* Performances
* ============
*
* In benchmarks this structure was better than alternatives such as:
*
* * A reversed Treiber stack [2], using 1 CAS per operations
* and requiring reversal of the node list on removal.
*
* * Michael-Scott lock-free queue [3], using 2 CAS per operations.
*
* While it is not linearizable, this queue is well-suited for message passing.
* If a proper hardware XCHG operation is used, it scales better than
* CAS-based implementations.
*
* References
* ==========
*
* [1]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
*
* [2]: R. K. Treiber. Systems programming: Coping with parallelism.
* Technical Report RJ 5118, IBM Almaden Research Center, April 1986.
*
* [3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and
* Blocking Concurrent Queue Algorithms
* [3]: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
*
*/
void
mpsc_queue_init(struct mpsc_queue *queue)
{
atomic_store_relaxed(&queue->head, &queue->stub);
atomic_store_relaxed(&queue->tail, &queue->stub);
atomic_store_relaxed(&queue->stub.next, NULL);
ovs_mutex_init(&queue->read_lock);
}
void
mpsc_queue_destroy(struct mpsc_queue *queue)
OVS_EXCLUDED(queue->read_lock)
{
ovs_mutex_destroy(&queue->read_lock);
}
enum mpsc_queue_poll_result
mpsc_queue_poll(struct mpsc_queue *queue, struct mpsc_queue_node **node)
OVS_REQUIRES(queue->read_lock)
{
struct mpsc_queue_node *tail;
struct mpsc_queue_node *next;
struct mpsc_queue_node *head;
atomic_read_relaxed(&queue->tail, &tail);
atomic_read_explicit(&tail->next, &next, memory_order_acquire);
if (tail == &queue->stub) {
if (next == NULL) {
return MPSC_QUEUE_EMPTY;
}
atomic_store_relaxed(&queue->tail, next);
tail = next;
atomic_read_explicit(&tail->next, &next, memory_order_acquire);
}
if (next != NULL) {
atomic_store_relaxed(&queue->tail, next);
*node = tail;
return MPSC_QUEUE_ITEM;
}
atomic_read_explicit(&queue->head, &head, memory_order_acquire);
if (tail != head) {
return MPSC_QUEUE_RETRY;
}
mpsc_queue_insert(queue, &queue->stub);
atomic_read_explicit(&tail->next, &next, memory_order_acquire);
if (next != NULL) {
atomic_store_relaxed(&queue->tail, next);
*node = tail;
return MPSC_QUEUE_ITEM;
}
return MPSC_QUEUE_EMPTY;
}
struct mpsc_queue_node *
mpsc_queue_pop(struct mpsc_queue *queue)
OVS_REQUIRES(queue->read_lock)
{
enum mpsc_queue_poll_result result;
struct mpsc_queue_node *node;
do {
result = mpsc_queue_poll(queue, &node);
if (result == MPSC_QUEUE_EMPTY) {
return NULL;
}
} while (result == MPSC_QUEUE_RETRY);
return node;
}
void
mpsc_queue_push_front(struct mpsc_queue *queue, struct mpsc_queue_node *node)
OVS_REQUIRES(queue->read_lock)
{
struct mpsc_queue_node *tail;
atomic_read_relaxed(&queue->tail, &tail);
atomic_store_relaxed(&node->next, tail);
atomic_store_relaxed(&queue->tail, node);
}
struct mpsc_queue_node *
mpsc_queue_tail(struct mpsc_queue *queue)
OVS_REQUIRES(queue->read_lock)
{
struct mpsc_queue_node *tail;
struct mpsc_queue_node *next;
atomic_read_relaxed(&queue->tail, &tail);
atomic_read_explicit(&tail->next, &next, memory_order_acquire);
if (tail == &queue->stub) {
if (next == NULL) {
return NULL;
}
atomic_store_relaxed(&queue->tail, next);
tail = next;
}
return tail;
}
/* Get the next element of a node. */
struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue,
struct mpsc_queue_node *prev)
OVS_REQUIRES(queue->read_lock)
{
struct mpsc_queue_node *next;
atomic_read_explicit(&prev->next, &next, memory_order_acquire);
if (next == &queue->stub) {
atomic_read_explicit(&next->next, &next, memory_order_acquire);
}
return next;
}
void
mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node)
{
struct mpsc_queue_node *prev;
atomic_store_relaxed(&node->next, NULL);
prev = atomic_exchange_explicit(&queue->head, node, memory_order_acq_rel);
atomic_store_explicit(&prev->next, node, memory_order_release);
}