2
0
mirror of https://github.com/openvswitch/ovs synced 2025-08-31 06:15:47 +00:00

dpif-netdev: Implement the API functions to allow multiple handler

threads read upcall.

This commit implements the API functions to allow multiple handler
threads read upcall.

Also, this commit removes the handling priority of DPIF_UC_MISS
over DPIF_UC_ACTION.  So, both misses will be put to the same
queue.  The decision is based on the fact that a lot has changed
since the age when flow setup rate is most treasured and starving
all actions in the presence of any flow misses doesn't seem like
a sound balancing solution.

Thusly the current implementation will be put in testing and
investigation for better balancing solution will continue if
there is an issue.

Also note, the introduction and use of flow_hash_5tuple() will
put missed ICMP packets from same source but with different
type/code to different handler queues.  This may cause reordering
of these packets.  For now, we do not count this as a problem.

Signed-off-by: Alex Wang <alexw@nicira.com>
Acked-by: Ben Pfaff <blp@nicira.com>
This commit is contained in:
Alex Wang
2014-02-26 10:07:38 -08:00
parent 1954e6bbcb
commit 63be20bee2
3 changed files with 182 additions and 64 deletions

View File

@@ -74,7 +74,6 @@ enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow table. */
enum { DP_NETDEV_HEADROOM = 2 + VLAN_HEADER_LEN };
/* Queues. */
enum { N_QUEUES = 2 }; /* Number of queues for dpif_recv(). */
enum { MAX_QUEUE_LEN = 128 }; /* Maximum number of packets per queue. */
enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 };
BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN));
@@ -91,14 +90,17 @@ struct dp_netdev_upcall {
struct ofpbuf buf; /* ofpbuf instance for upcall.packet. */
};
/* A queue passing packets from a struct dp_netdev to its clients.
/* A queue passing packets from a struct dp_netdev to its clients (handlers).
*
*
* Thread-safety
* =============
*
* Any access at all requires the owning 'dp_netdev''s queue_mutex. */
* Any access at all requires the owning 'dp_netdev''s queue_rwlock and
* its own mutex. */
struct dp_netdev_queue {
struct ovs_mutex mutex;
struct seq *seq; /* Incremented whenever a packet is queued. */
struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED;
unsigned int head OVS_GUARDED;
unsigned int tail OVS_GUARDED;
@@ -119,7 +121,7 @@ struct dp_netdev_queue {
* port_rwlock
* flow_mutex
* cls.rwlock
* queue_mutex
* queue_rwlock
*/
struct dp_netdev {
const struct dpif_class *const class;
@@ -141,10 +143,12 @@ struct dp_netdev {
/* Queues.
*
* Everything in 'queues' is protected by 'queue_mutex'. */
struct ovs_mutex queue_mutex;
struct dp_netdev_queue queues[N_QUEUES];
struct seq *queue_seq; /* Incremented whenever a packet is queued. */
* 'queue_rwlock' protects the modification of 'handler_queues' and
* 'n_handlers'. The queue elements are protected by its
* 'handler_queues''s mutex. */
struct fat_rwlock queue_rwlock;
struct dp_netdev_queue *handler_queues;
uint32_t n_handlers;
/* Statistics.
*
@@ -329,12 +333,15 @@ static int do_add_port(struct dp_netdev *dp, const char *devname,
OVS_REQ_WRLOCK(dp->port_rwlock);
static int do_del_port(struct dp_netdev *dp, odp_port_t port_no)
OVS_REQ_WRLOCK(dp->port_rwlock);
static void dp_netdev_destroy_all_queues(struct dp_netdev *dp)
OVS_REQ_WRLOCK(dp->queue_rwlock);
static int dpif_netdev_open(const struct dpif_class *, const char *name,
bool create, struct dpif **);
static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *,
int queue_no, const struct flow *,
const struct nlattr *userdata)
OVS_EXCLUDED(dp->queue_mutex);
int queue_no, int type,
const struct flow *,
const struct nlattr *userdata)
OVS_EXCLUDED(dp->queue_rwlock);
static void dp_netdev_execute_actions(struct dp_netdev *dp,
const struct flow *, struct ofpbuf *,
struct pkt_metadata *,
@@ -452,7 +459,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
{
struct dp_netdev *dp;
int error;
int i;
dp = xzalloc(sizeof *dp);
shash_add(&dp_netdevs, name, dp);
@@ -466,13 +472,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
classifier_init(&dp->cls, NULL);
hmap_init(&dp->flow_table);
ovs_mutex_init(&dp->queue_mutex);
ovs_mutex_lock(&dp->queue_mutex);
for (i = 0; i < N_QUEUES; i++) {
dp->queues[i].head = dp->queues[i].tail = 0;
}
ovs_mutex_unlock(&dp->queue_mutex);
dp->queue_seq = seq_create();
fat_rwlock_init(&dp->queue_rwlock);
ovsthread_stats_init(&dp->stats);
@@ -520,20 +520,21 @@ dpif_netdev_open(const struct dpif_class *class, const char *name,
static void
dp_netdev_purge_queues(struct dp_netdev *dp)
OVS_REQ_WRLOCK(dp->queue_rwlock)
{
int i;
ovs_mutex_lock(&dp->queue_mutex);
for (i = 0; i < N_QUEUES; i++) {
struct dp_netdev_queue *q = &dp->queues[i];
for (i = 0; i < dp->n_handlers; i++) {
struct dp_netdev_queue *q = &dp->handler_queues[i];
ovs_mutex_lock(&q->mutex);
while (q->tail != q->head) {
struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
ofpbuf_uninit(&u->upcall.packet);
ofpbuf_uninit(&u->buf);
}
ovs_mutex_unlock(&q->mutex);
}
ovs_mutex_unlock(&dp->queue_mutex);
}
/* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
@@ -564,9 +565,11 @@ dp_netdev_free(struct dp_netdev *dp)
}
ovsthread_stats_destroy(&dp->stats);
dp_netdev_purge_queues(dp);
seq_destroy(dp->queue_seq);
ovs_mutex_destroy(&dp->queue_mutex);
fat_rwlock_wrlock(&dp->queue_rwlock);
dp_netdev_destroy_all_queues(dp);
fat_rwlock_unlock(&dp->queue_rwlock);
fat_rwlock_destroy(&dp->queue_rwlock);
classifier_destroy(&dp->cls);
hmap_destroy(&dp->flow_table);
@@ -1472,16 +1475,77 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
return 0;
}
static int
dpif_netdev_recv_set(struct dpif *dpif OVS_UNUSED, bool enable OVS_UNUSED)
static void
dp_netdev_destroy_all_queues(struct dp_netdev *dp)
OVS_REQ_WRLOCK(dp->queue_rwlock)
{
size_t i;
dp_netdev_purge_queues(dp);
for (i = 0; i < dp->n_handlers; i++) {
struct dp_netdev_queue *q = &dp->handler_queues[i];
ovs_mutex_destroy(&q->mutex);
seq_destroy(q->seq);
}
free(dp->handler_queues);
dp->handler_queues = NULL;
dp->n_handlers = 0;
}
static void
dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers)
OVS_REQ_WRLOCK(dp->queue_rwlock)
{
if (dp->n_handlers != n_handlers) {
size_t i;
dp_netdev_destroy_all_queues(dp);
dp->n_handlers = n_handlers;
dp->handler_queues = xzalloc(n_handlers * sizeof *dp->handler_queues);
for (i = 0; i < n_handlers; i++) {
struct dp_netdev_queue *q = &dp->handler_queues[i];
ovs_mutex_init(&q->mutex);
q->seq = seq_create();
}
}
}
static int
dpif_netdev_recv_set(struct dpif *dpif, bool enable)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
if ((dp->handler_queues != NULL) == enable) {
return 0;
}
fat_rwlock_wrlock(&dp->queue_rwlock);
if (!enable) {
dp_netdev_destroy_all_queues(dp);
} else {
dp_netdev_refresh_queues(dp, 1);
}
fat_rwlock_unlock(&dp->queue_rwlock);
return 0;
}
static int
dpif_netdev_handlers_set(struct dpif *dpif OVS_UNUSED,
uint32_t n_handlers OVS_UNUSED)
dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
fat_rwlock_wrlock(&dp->queue_rwlock);
if (dp->handler_queues) {
dp_netdev_refresh_queues(dp, n_handlers);
}
fat_rwlock_unlock(&dp->queue_rwlock);
return 0;
}
@@ -1493,62 +1557,86 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
return 0;
}
static struct dp_netdev_queue *
find_nonempty_queue(struct dp_netdev *dp)
OVS_REQUIRES(dp->queue_mutex)
static bool
dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t handler_id)
OVS_REQ_RDLOCK(dp->queue_rwlock)
{
int i;
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
for (i = 0; i < N_QUEUES; i++) {
struct dp_netdev_queue *q = &dp->queues[i];
if (q->head != q->tail) {
return q;
}
if (!dp->handler_queues) {
VLOG_WARN_RL(&rl, "receiving upcall disabled");
return false;
}
return NULL;
if (handler_id >= dp->n_handlers) {
VLOG_WARN_RL(&rl, "handler index out of bound");
return false;
}
return true;
}
static int
dpif_netdev_recv(struct dpif *dpif, uint32_t n_handlers OVS_UNUSED,
dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id,
struct dpif_upcall *upcall, struct ofpbuf *buf)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_queue *q;
int error;
int error = 0;
ovs_mutex_lock(&dp->queue_mutex);
q = find_nonempty_queue(dp);
if (q) {
fat_rwlock_rdlock(&dp->queue_rwlock);
if (!dp_netdev_recv_check(dp, handler_id)) {
error = EAGAIN;
goto out;
}
q = &dp->handler_queues[handler_id];
ovs_mutex_lock(&q->mutex);
if (q->head != q->tail) {
struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
*upcall = u->upcall;
ofpbuf_uninit(buf);
*buf = u->buf;
error = 0;
} else {
error = EAGAIN;
}
ovs_mutex_unlock(&dp->queue_mutex);
ovs_mutex_unlock(&q->mutex);
out:
fat_rwlock_unlock(&dp->queue_rwlock);
return error;
}
static void
dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id OVS_UNUSED)
dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_queue *q;
uint64_t seq;
ovs_mutex_lock(&dp->queue_mutex);
seq = seq_read(dp->queue_seq);
if (find_nonempty_queue(dp)) {
fat_rwlock_rdlock(&dp->queue_rwlock);
if (!dp_netdev_recv_check(dp, handler_id)) {
goto out;
}
q = &dp->handler_queues[handler_id];
ovs_mutex_lock(&q->mutex);
seq = seq_read(q->seq);
if (q->head != q->tail) {
poll_immediate_wake();
} else {
seq_wait(dp->queue_seq, seq);
seq_wait(q->seq, seq);
}
ovs_mutex_unlock(&dp->queue_mutex);
ovs_mutex_unlock(&q->mutex);
out:
fat_rwlock_unlock(&dp->queue_rwlock);
}
static void
@@ -1556,7 +1644,9 @@ dpif_netdev_recv_purge(struct dpif *dpif)
{
struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock);
dp_netdev_purge_queues(dpif_netdev->dp);
fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock);
}
/* Creates and returns a new 'struct dp_netdev_actions', with a reference count
@@ -1776,29 +1866,33 @@ dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet,
dp_netdev_execute_actions(dp, &key, packet, md,
actions->actions, actions->size);
dp_netdev_count_packet(dp, DP_STAT_HIT);
} else {
} else if (dp->handler_queues) {
dp_netdev_count_packet(dp, DP_STAT_MISS);
dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, NULL);
dp_netdev_output_userspace(dp, packet,
flow_hash_5tuple(&key, 0) % dp->n_handlers,
DPIF_UC_MISS, &key, NULL);
}
}
static int
dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
int queue_no, const struct flow *flow,
int queue_no, int type, const struct flow *flow,
const struct nlattr *userdata)
OVS_EXCLUDED(dp->queue_mutex)
OVS_EXCLUDED(dp->queue_rwlock)
{
struct dp_netdev_queue *q = &dp->queues[queue_no];
struct dp_netdev_queue *q;
int error;
ovs_mutex_lock(&dp->queue_mutex);
fat_rwlock_rdlock(&dp->queue_rwlock);
q = &dp->handler_queues[queue_no];
ovs_mutex_lock(&q->mutex);
if (q->head - q->tail < MAX_QUEUE_LEN) {
struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK];
struct dpif_upcall *upcall = &u->upcall;
struct ofpbuf *buf = &u->buf;
size_t buf_size;
upcall->type = queue_no;
upcall->type = type;
/* Allocate buffer big enough for everything. */
buf_size = ODPUTIL_FLOW_KEY_BYTES;
@@ -1823,14 +1917,15 @@ dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
upcall->packet = *packet;
ofpbuf_use(packet, NULL, 0);
seq_change(dp->queue_seq);
seq_change(q->seq);
error = 0;
} else {
dp_netdev_count_packet(dp, DP_STAT_LOST);
error = ENOBUFS;
}
ovs_mutex_unlock(&dp->queue_mutex);
ovs_mutex_unlock(&q->mutex);
fat_rwlock_unlock(&dp->queue_rwlock);
return error;
}
@@ -1867,7 +1962,10 @@ dp_execute_cb(void *aux_, struct ofpbuf *packet,
if (!may_steal) {
packet = ofpbuf_clone_with_headroom(packet, DP_NETDEV_HEADROOM);
}
dp_netdev_output_userspace(aux->dp, packet, DPIF_UC_ACTION, aux->key,
dp_netdev_output_userspace(aux->dp, packet,
flow_hash_5tuple(aux->key, 0)
% aux->dp->n_handlers,
DPIF_UC_ACTION, aux->key,
userdata);
if (!may_steal) {
ofpbuf_uninit(packet);