2
0
mirror of https://github.com/openvswitch/ovs synced 2025-10-25 15:07:05 +00:00

netlink: Make nl_dump_next() thread-safe.

This patch modifies 'struct nl_dump' and nl_dump_next() to allow
multiple threads to share the same nl_dump. These changes are targeted
around synchronizing dump status between multiple callers, and
allowing callers to fully process their existing buffers before
determining whether to stop fetching flows.

The 'status' field of 'struct nl_dump' becomes atomic, so that multiple
threads may check and/or update it to communicate when there is an error
or the netlink dump is finished. The low bit holds whether the final
message was seen, while the higher bits hold an errno value.

nl_dump_next() will now read all messages from the given buffer before
checking the shared error status and attempting to fetch more. Multiple
threads may call this with the same nl_dump, but must provide
independent buffers. As previously, the final dump status can be
determined by calling nl_dump_done() from a single thread.

Signed-off-by: Joe Stringer <joestringer@nicira.com>
Signed-off-by: Ben Pfaff <blp@nicira.com>
This commit is contained in:
Joe Stringer
2014-02-27 14:13:06 -08:00
committed by Ben Pfaff
parent d57695d77e
commit 0672776e58
2 changed files with 92 additions and 55 deletions

View File

@@ -31,6 +31,7 @@
#include "ofpbuf.h"
#include "ovs-thread.h"
#include "poll-loop.h"
#include "seq.h"
#include "socket-util.h"
#include "util.h"
#include "vlog.h"
@@ -690,43 +691,18 @@ nl_sock_drain(struct nl_sock *sock)
void
nl_dump_start(struct nl_dump *dump, int protocol, const struct ofpbuf *request)
{
dump->status = nl_pool_alloc(protocol, &dump->sock);
if (dump->status) {
int status = nl_pool_alloc(protocol, &dump->sock);
if (status) {
return;
}
nl_msg_nlmsghdr(request)->nlmsg_flags |= NLM_F_DUMP | NLM_F_ACK;
dump->status = nl_sock_send__(dump->sock, request,
nl_sock_allocate_seq(dump->sock, 1), true);
status = nl_sock_send__(dump->sock, request,
nl_sock_allocate_seq(dump->sock, 1), true);
atomic_init(&dump->status, status << 1);
dump->nl_seq = nl_msg_nlmsghdr(request)->nlmsg_seq;
}
/* Helper function for nl_dump_next(). */
static int
nl_dump_recv(struct nl_dump *dump, struct ofpbuf *buffer)
{
struct nlmsghdr *nlmsghdr;
int retval;
retval = nl_sock_recv__(dump->sock, buffer, true);
if (retval) {
return retval == EINTR ? EAGAIN : retval;
}
nlmsghdr = nl_msg_nlmsghdr(buffer);
if (dump->nl_seq != nlmsghdr->nlmsg_seq) {
VLOG_DBG_RL(&rl, "ignoring seq %#"PRIx32" != expected %#"PRIx32,
nlmsghdr->nlmsg_seq, dump->nl_seq);
return EAGAIN;
}
if (nl_msg_nlmsgerr(buffer, &retval)) {
VLOG_INFO_RL(&rl, "netlink dump request error (%s)",
ovs_strerror(retval));
return retval && retval != EAGAIN ? retval : EPROTO;
}
return 0;
dump->status_seq = seq_create();
}
/* Attempts to retrieve another reply from 'dump' into 'buffer'. 'dump' must
@@ -742,40 +718,83 @@ nl_dump_recv(struct nl_dump *dump, struct ofpbuf *buffer)
* to 0. Failure might indicate an actual error or merely the end of replies.
* An error status for the entire dump operation is provided when it is
* completed by calling nl_dump_done().
*
* Multiple threads may call this function, passing the same nl_dump, however
* each must provide independent buffers. This function may cache multiple
* replies in the buffer, and these will be processed before more replies are
* fetched. When this function returns false, other threads may continue to
* process replies in their buffers, but they will not fetch more replies.
*/
bool
nl_dump_next(struct nl_dump *dump, struct ofpbuf *reply, struct ofpbuf *buffer)
{
struct nlmsghdr *nlmsghdr;
int error = 0;
reply->data = NULL;
reply->size = 0;
if (dump->status) {
return false;
}
/* If 'buffer' is empty, fetch another batch of nlmsgs. */
while (!buffer->size) {
int retval = nl_dump_recv(dump, buffer);
unsigned int status;
int retval, seq;
seq = seq_read(dump->status_seq);
atomic_read(&dump->status, &status);
if (status) {
return false;
}
retval = nl_sock_recv__(dump->sock, buffer, false);
if (retval) {
ofpbuf_clear(buffer);
if (retval != EAGAIN) {
dump->status = retval;
return false;
if (retval == EAGAIN) {
nl_sock_wait(dump->sock, POLLIN);
seq_wait(dump->status_seq, seq);
poll_block();
continue;
} else {
error = retval;
goto exit;
}
}
nlmsghdr = nl_msg_nlmsghdr(buffer);
if (dump->nl_seq != nlmsghdr->nlmsg_seq) {
VLOG_DBG_RL(&rl, "ignoring seq %#"PRIx32" != expected %#"PRIx32,
nlmsghdr->nlmsg_seq, dump->nl_seq);
ofpbuf_clear(buffer);
continue;
}
if (nl_msg_nlmsgerr(buffer, &retval) && retval) {
VLOG_INFO_RL(&rl, "netlink dump request error (%s)",
ovs_strerror(retval));
error = retval == EAGAIN ? EPROTO : retval;
ofpbuf_clear(buffer);
goto exit;
}
}
/* Fetch the next nlmsg in the current batch. */
nlmsghdr = nl_msg_next(buffer, reply);
if (!nlmsghdr) {
VLOG_WARN_RL(&rl, "netlink dump reply contains message fragment");
dump->status = EPROTO;
return false;
error = EPROTO;
} else if (nlmsghdr->nlmsg_type == NLMSG_DONE) {
dump->status = EOF;
return false;
error = EOF;
}
return true;
exit:
if (error == EOF) {
unsigned int old;
atomic_or(&dump->status, 1, &old);
seq_change(dump->status_seq);
} else if (error) {
atomic_store(&dump->status, error << 1);
seq_change(dump->status_seq);
}
return !error;
}
/* Completes Netlink dump operation 'dump', which must have been initialized
@@ -784,23 +803,29 @@ nl_dump_next(struct nl_dump *dump, struct ofpbuf *reply, struct ofpbuf *buffer)
int
nl_dump_done(struct nl_dump *dump)
{
uint64_t tmp_reply_stub[NL_DUMP_BUFSIZE / 8];
struct ofpbuf buf;
int status;
/* Drain any remaining messages that the client didn't read. Otherwise the
* kernel will continue to queue them up and waste buffer space.
*
* XXX We could just destroy and discard the socket in this case. */
ofpbuf_use_stub(&buf, tmp_reply_stub, sizeof tmp_reply_stub);
while (!dump->status) {
struct ofpbuf reply;
if (!nl_dump_next(dump, &reply, &buf)) {
ovs_assert(dump->status);
atomic_read(&dump->status, &status);
if (!status) {
uint64_t tmp_reply_stub[NL_DUMP_BUFSIZE / 8];
struct ofpbuf reply, buf;
ofpbuf_use_stub(&buf, tmp_reply_stub, sizeof tmp_reply_stub);
while (nl_dump_next(dump, &reply, &buf)) {
/* Nothing to do. */
}
atomic_read(&dump->status, &status);
ovs_assert(status);
ofpbuf_uninit(&buf);
}
atomic_destroy(&dump->status);
nl_pool_release(dump->sock);
ofpbuf_uninit(&buf);
return dump->status == EOF ? 0 : dump->status;
seq_destroy(dump->status_seq);
return status >> 1;
}
/* Causes poll_block() to wake up when any of the specified 'events' (which is