2
0
mirror of https://github.com/openvswitch/ovs synced 2025-10-09 13:49:05 +00:00

dpif-netdev: Polling threads directly call ofproto upcall functions.

Typically, kernel datapath threads send upcalls to userspace where
handler threads process the upcalls. For TAP and DPDK devices, the
datapath threads operate in userspace, so there is no need for
separate handler threads.

This patch allows userspace datapath threads to directly call the
ofproto upcall functions, eliminating the need for handler threads
for datapaths of type 'netdev'.

Signed-off-by: Ryan Wilson <wryan@nicira.com>
Signed-off-by: Ethan Jackson <ethan@nicira.com>
Acked-by: Ethan Jackson <ethan@nicira.com>
This commit is contained in:
Ryan Wilson
2014-07-26 06:51:55 +00:00
committed by Ethan Jackson
parent 5fdbade3c1
commit 6b31e07347
8 changed files with 374 additions and 390 deletions

View File

@@ -78,11 +78,6 @@ DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
/* Configuration parameters. */
enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow table. */
/* Queues. */
enum { MAX_QUEUE_LEN = 128 }; /* Maximum number of packets per queue. */
enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 };
BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN));
/* Protects against changes to 'dp_netdevs'. */
static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;
@@ -90,26 +85,14 @@ static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;
static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex)
= SHASH_INITIALIZER(&dp_netdevs);
struct dp_netdev_upcall {
struct dpif_upcall upcall; /* Queued upcall information. */
struct ofpbuf buf; /* ofpbuf instance for upcall.packet. */
struct dp_netdev_queue {
unsigned int packet_count;
struct dpif_upcall upcalls[NETDEV_MAX_RX_BATCH];
struct ofpbuf bufs[NETDEV_MAX_RX_BATCH];
};
/* A queue passing packets from a struct dp_netdev to its clients (handlers).
*
*
* Thread-safety
* =============
*
* Any access at all requires the owning 'dp_netdev''s queue_rwlock and
* its own mutex. */
struct dp_netdev_queue {
struct ovs_mutex mutex;
struct seq *seq; /* Incremented whenever a packet is queued. */
struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED;
unsigned int head OVS_GUARDED;
unsigned int tail OVS_GUARDED;
};
#define DP_NETDEV_QUEUE_INITIALIZER { .packet_count = 0 }
/* Datapath based on the network device interface from netdev.h.
*
@@ -125,11 +108,11 @@ struct dp_netdev_queue {
* dp_netdev_mutex (global)
* port_mutex
* flow_mutex
* queue_rwlock
*/
struct dp_netdev {
const struct dpif_class *const class;
const char *const name;
struct dpif *dpif;
struct ovs_refcount ref_cnt;
atomic_flag destroyed;
@@ -142,15 +125,6 @@ struct dp_netdev {
struct classifier cls;
struct cmap flow_table OVS_GUARDED; /* Flow table. */
/* Queues.
*
* 'queue_rwlock' protects the modification of 'handler_queues' and
* 'n_handlers'. The queue elements are protected by its
* 'handler_queues''s mutex. */
struct fat_rwlock queue_rwlock;
struct dp_netdev_queue *handler_queues;
uint32_t n_handlers;
/* Statistics.
*
* ovsthread_stats is internally synchronized. */
@@ -163,6 +137,11 @@ struct dp_netdev {
struct cmap ports;
struct seq *port_seq; /* Incremented whenever a port changes. */
/* Protects access to ofproto-dpif-upcall interface during revalidator
* thread synchronization. */
struct fat_rwlock upcall_rwlock;
exec_upcall_cb *upcall_cb; /* Callback function for executing upcalls. */
/* Forwarding threads. */
struct latch exit_latch;
struct pmd_thread *pmd_threads;
@@ -339,14 +318,14 @@ static int do_add_port(struct dp_netdev *dp, const char *devname,
OVS_REQUIRES(dp->port_mutex);
static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *)
OVS_REQUIRES(dp->port_mutex);
static void dp_netdev_destroy_all_queues(struct dp_netdev *dp)
OVS_REQ_WRLOCK(dp->queue_rwlock);
static int dpif_netdev_open(const struct dpif_class *, const char *name,
bool create, struct dpif **);
static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *,
int queue_no, int type,
const struct miniflow *,
const struct nlattr *userdata);
static int dp_netdev_queue_userspace_packet(struct dp_netdev_queue *,
struct ofpbuf *, int type,
const struct miniflow *,
const struct nlattr *);
static void dp_netdev_execute_userspace_queue(struct dp_netdev_queue *,
struct dp_netdev *);
static void dp_netdev_execute_actions(struct dp_netdev *dp,
struct dpif_packet **, int c,
bool may_steal, struct pkt_metadata *,
@@ -357,6 +336,7 @@ static void dp_netdev_port_input(struct dp_netdev *dp,
odp_port_t port_no);
static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
static void dp_netdev_disable_upcall(struct dp_netdev *);
static struct dpif_netdev *
dpif_netdev_cast(const struct dpif *dpif)
@@ -484,14 +464,17 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
classifier_init(&dp->cls, NULL);
cmap_init(&dp->flow_table);
fat_rwlock_init(&dp->queue_rwlock);
ovsthread_stats_init(&dp->stats);
ovs_mutex_init(&dp->port_mutex);
cmap_init(&dp->ports);
dp->port_seq = seq_create();
latch_init(&dp->exit_latch);
fat_rwlock_init(&dp->upcall_rwlock);
/* Disable upcalls by default. */
dp_netdev_disable_upcall(dp);
dp->upcall_cb = NULL;
ovs_mutex_lock(&dp->port_mutex);
error = do_add_port(dp, name, "internal", ODPP_LOCAL);
@@ -523,31 +506,13 @@ dpif_netdev_open(const struct dpif_class *class, const char *name,
}
if (!error) {
*dpifp = create_dpif_netdev(dp);
dp->dpif = *dpifp;
}
ovs_mutex_unlock(&dp_netdev_mutex);
return error;
}
static void
dp_netdev_purge_queues(struct dp_netdev *dp)
OVS_REQ_WRLOCK(dp->queue_rwlock)
{
int i;
for (i = 0; i < dp->n_handlers; i++) {
struct dp_netdev_queue *q = &dp->handler_queues[i];
ovs_mutex_lock(&q->mutex);
while (q->tail != q->head) {
struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
ofpbuf_uninit(&u->upcall.packet);
ofpbuf_uninit(&u->buf);
}
ovs_mutex_unlock(&q->mutex);
}
}
/* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
* through the 'dp_netdevs' shash while freeing 'dp'. */
static void
@@ -576,17 +541,12 @@ dp_netdev_free(struct dp_netdev *dp)
}
ovsthread_stats_destroy(&dp->stats);
fat_rwlock_wrlock(&dp->queue_rwlock);
dp_netdev_destroy_all_queues(dp);
fat_rwlock_unlock(&dp->queue_rwlock);
fat_rwlock_destroy(&dp->queue_rwlock);
classifier_destroy(&dp->cls);
cmap_destroy(&dp->flow_table);
ovs_mutex_destroy(&dp->flow_mutex);
seq_destroy(dp->port_seq);
cmap_destroy(&dp->ports);
fat_rwlock_destroy(&dp->upcall_rwlock);
latch_destroy(&dp->exit_latch);
free(CONST_CAST(char *, dp->name));
free(dp);
@@ -1559,80 +1519,6 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
return 0;
}
static void
dp_netdev_destroy_all_queues(struct dp_netdev *dp)
OVS_REQ_WRLOCK(dp->queue_rwlock)
{
size_t i;
dp_netdev_purge_queues(dp);
for (i = 0; i < dp->n_handlers; i++) {
struct dp_netdev_queue *q = &dp->handler_queues[i];
ovs_mutex_destroy(&q->mutex);
seq_destroy(q->seq);
}
free(dp->handler_queues);
dp->handler_queues = NULL;
dp->n_handlers = 0;
}
static void
dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers)
OVS_REQ_WRLOCK(dp->queue_rwlock)
{
if (dp->n_handlers != n_handlers) {
size_t i;
dp_netdev_destroy_all_queues(dp);
dp->n_handlers = n_handlers;
dp->handler_queues = xzalloc(n_handlers * sizeof *dp->handler_queues);
for (i = 0; i < n_handlers; i++) {
struct dp_netdev_queue *q = &dp->handler_queues[i];
ovs_mutex_init(&q->mutex);
q->seq = seq_create();
}
}
}
static int
dpif_netdev_recv_set(struct dpif *dpif, bool enable)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
if ((dp->handler_queues != NULL) == enable) {
return 0;
}
fat_rwlock_wrlock(&dp->queue_rwlock);
if (!enable) {
dp_netdev_destroy_all_queues(dp);
} else {
dp_netdev_refresh_queues(dp, 1);
}
fat_rwlock_unlock(&dp->queue_rwlock);
return 0;
}
static int
dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
fat_rwlock_wrlock(&dp->queue_rwlock);
if (dp->handler_queues) {
dp_netdev_refresh_queues(dp, n_handlers);
}
fat_rwlock_unlock(&dp->queue_rwlock);
return 0;
}
static int
dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
uint32_t queue_id, uint32_t *priority)
@@ -1641,97 +1527,6 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
return 0;
}
static bool
dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t handler_id)
OVS_REQ_RDLOCK(dp->queue_rwlock)
{
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
if (!dp->handler_queues) {
VLOG_WARN_RL(&rl, "receiving upcall disabled");
return false;
}
if (handler_id >= dp->n_handlers) {
VLOG_WARN_RL(&rl, "handler index out of bound");
return false;
}
return true;
}
static int
dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id,
struct dpif_upcall *upcall, struct ofpbuf *buf)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_queue *q;
int error = 0;
fat_rwlock_rdlock(&dp->queue_rwlock);
if (!dp_netdev_recv_check(dp, handler_id)) {
error = EAGAIN;
goto out;
}
q = &dp->handler_queues[handler_id];
ovs_mutex_lock(&q->mutex);
if (q->head != q->tail) {
struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
*upcall = u->upcall;
ofpbuf_uninit(buf);
*buf = u->buf;
} else {
error = EAGAIN;
}
ovs_mutex_unlock(&q->mutex);
out:
fat_rwlock_unlock(&dp->queue_rwlock);
return error;
}
static void
dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_queue *q;
uint64_t seq;
fat_rwlock_rdlock(&dp->queue_rwlock);
if (!dp_netdev_recv_check(dp, handler_id)) {
goto out;
}
q = &dp->handler_queues[handler_id];
ovs_mutex_lock(&q->mutex);
seq = seq_read(q->seq);
if (q->head != q->tail) {
poll_immediate_wake();
} else {
seq_wait(q->seq, seq);
}
ovs_mutex_unlock(&q->mutex);
out:
fat_rwlock_unlock(&dp->queue_rwlock);
}
static void
dpif_netdev_recv_purge(struct dpif *dpif)
{
struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock);
dp_netdev_purge_queues(dpif_netdev->dp);
fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock);
}
/* Creates and returns a new 'struct dp_netdev_actions', with a reference count
* of 1, whose actions are a copy of from the 'ofpacts_len' bytes of
@@ -1918,6 +1713,36 @@ reload:
return NULL;
}
static void
dp_netdev_disable_upcall(struct dp_netdev *dp)
OVS_ACQUIRES(dp->upcall_rwlock)
{
fat_rwlock_wrlock(&dp->upcall_rwlock);
}
static void
dpif_netdev_disable_upcall(struct dpif *dpif)
OVS_NO_THREAD_SAFETY_ANALYSIS
{
struct dp_netdev *dp = get_dp_netdev(dpif);
dp_netdev_disable_upcall(dp);
}
static void
dp_netdev_enable_upcall(struct dp_netdev *dp)
OVS_RELEASES(dp->upcall_rwlock)
{
fat_rwlock_unlock(&dp->upcall_rwlock);
}
static void
dpif_netdev_enable_upcall(struct dpif *dpif)
OVS_NO_THREAD_SAFETY_ANALYSIS
{
struct dp_netdev *dp = get_dp_netdev(dpif);
dp_netdev_enable_upcall(dp);
}
static void
dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
{
@@ -2056,6 +1881,7 @@ static void
dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
struct pkt_metadata *md)
{
struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER;
struct packet_batch batches[NETDEV_MAX_RX_BATCH];
struct netdev_flow_key keys[NETDEV_MAX_RX_BATCH];
const struct miniflow *mfs[NETDEV_MAX_RX_BATCH]; /* NULL at bad packets. */
@@ -2087,17 +1913,11 @@ dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
}
if (OVS_UNLIKELY(!rules[i])) {
struct ofpbuf *buf = &packets[i]->ofpbuf;
dp_netdev_count_packet(dp, DP_STAT_MISS, 1);
if (OVS_LIKELY(dp->handler_queues)) {
uint32_t hash = miniflow_hash_5tuple(mfs[i], 0);
struct ofpbuf *buf = &packets[i]->ofpbuf;
dp_netdev_output_userspace(dp, buf, hash % dp->n_handlers,
DPIF_UC_MISS, mfs[i], NULL);
}
dp_netdev_queue_userspace_packet(&q, buf, DPIF_UC_MISS,
mfs[i], NULL);
dpif_packet_delete(packets[i]);
continue;
}
@@ -2127,6 +1947,10 @@ dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
for (i = 0; i < n_batches; i++) {
packet_batch_execute(&batches[i], dp);
}
if (q.packet_count) {
dp_netdev_execute_userspace_queue(&q, dp);
}
}
static void
@@ -2145,12 +1969,11 @@ dp_netdev_queue_userspace_packet(struct dp_netdev_queue *q,
struct ofpbuf *packet, int type,
const struct miniflow *key,
const struct nlattr *userdata)
OVS_REQUIRES(q->mutex)
{
if (q->head - q->tail < MAX_QUEUE_LEN) {
struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK];
struct dpif_upcall *upcall = &u->upcall;
struct ofpbuf *buf = &u->buf;
if (q->packet_count < NETDEV_MAX_RX_BATCH) {
int cnt = q->packet_count;
struct dpif_upcall *upcall = &q->upcalls[cnt];
struct ofpbuf *buf = &q->bufs[cnt];
size_t buf_size;
struct flow flow;
void *data;
@@ -2174,7 +1997,7 @@ OVS_REQUIRES(q->mutex)
/* Put userdata. */
if (userdata) {
upcall->userdata = ofpbuf_put(buf, userdata,
NLA_ALIGN(userdata->nla_len));
NLA_ALIGN(userdata->nla_len));
}
/* We have to perform a copy of the packet, because we cannot send DPDK
@@ -2184,41 +2007,46 @@ OVS_REQUIRES(q->mutex)
ofpbuf_use_stub(&upcall->packet, data, ofpbuf_size(packet));
ofpbuf_set_size(&upcall->packet, ofpbuf_size(packet));
seq_change(q->seq);
q->packet_count++;
return 0;
} else {
return ENOBUFS;
}
}
static int
dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
int queue_no, int type,
const struct miniflow *key,
const struct nlattr *userdata)
static void
dp_netdev_execute_userspace_queue(struct dp_netdev_queue *q,
struct dp_netdev *dp)
{
struct dp_netdev_queue *q;
int error;
struct dpif_upcall *upcalls = q->upcalls;
struct ofpbuf *bufs = q->bufs;
int cnt = q->packet_count;
fat_rwlock_rdlock(&dp->queue_rwlock);
q = &dp->handler_queues[queue_no];
ovs_mutex_lock(&q->mutex);
error = dp_netdev_queue_userspace_packet(q, packet, type, key,
userdata);
if (error == ENOBUFS) {
dp_netdev_count_packet(dp, DP_STAT_LOST, 1);
if (!fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
ovs_assert(dp->upcall_cb);
dp->upcall_cb(dp->dpif, upcalls, bufs, cnt);
fat_rwlock_unlock(&dp->upcall_rwlock);
} else {
int i;
for (i = 0; i < cnt; i++) {
ofpbuf_uninit(&bufs[i]);
ofpbuf_uninit(&upcalls[i].packet);
}
}
ovs_mutex_unlock(&q->mutex);
fat_rwlock_unlock(&dp->queue_rwlock);
return error;
}
struct dp_netdev_execute_aux {
struct dp_netdev *dp;
};
static void
dpif_netdev_register_upcall_cb(struct dpif *dpif, exec_upcall_cb *cb)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
dp->upcall_cb = cb;
}
static void
dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
struct pkt_metadata *md,
@@ -2246,6 +2074,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
case OVS_ACTION_ATTR_USERSPACE: {
const struct nlattr *userdata;
struct netdev_flow_key key;
struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER;
userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
@@ -2258,15 +2087,17 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
miniflow_extract(packet, md, &key.flow);
dp_netdev_output_userspace(aux->dp, packet,
miniflow_hash_5tuple(&key.flow, 0)
% aux->dp->n_handlers,
DPIF_UC_ACTION, &key.flow,
userdata);
dp_netdev_queue_userspace_packet(&q, packet,
DPIF_UC_ACTION, &key.flow,
userdata);
if (may_steal) {
dpif_packet_delete(packets[i]);
}
}
if (q.packet_count) {
dp_netdev_execute_userspace_queue(&q, aux->dp);
}
break;
}
@@ -2392,12 +2223,15 @@ const struct dpif_class dpif_netdev_class = {
dpif_netdev_flow_dump_next,
dpif_netdev_execute,
NULL, /* operate */
dpif_netdev_recv_set,
dpif_netdev_handlers_set,
NULL, /* recv_set */
NULL, /* handlers_set */
dpif_netdev_queue_to_priority,
dpif_netdev_recv,
dpif_netdev_recv_wait,
dpif_netdev_recv_purge,
NULL, /* recv */
NULL, /* recv_wait */
NULL, /* recv_purge */
dpif_netdev_register_upcall_cb,
dpif_netdev_enable_upcall,
dpif_netdev_disable_upcall,
};
static void