diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 9270873c5..09220b639 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -15,7 +15,7 @@ */ #include -#include "dpif.h" +#include "dpif-netdev.h" #include #include @@ -71,7 +71,6 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev); #define NETDEV_RULE_PRIORITY 0x8000 #define FLOW_DUMP_MAX_BATCH 50 -#define NR_THREADS 1 /* Use per thread recirc_depth to prevent recirculation loop. */ #define MAX_RECIRC_DEPTH 5 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0) @@ -344,8 +343,8 @@ static void dp_netdev_destroy_all_queues(struct dp_netdev *dp) OVS_REQ_WRLOCK(dp->queue_rwlock); static int dpif_netdev_open(const struct dpif_class *, const char *name, bool create, struct dpif **); -static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf **, - int cnt, int queue_no, int type, +static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *, + int queue_no, int type, const struct miniflow *, const struct nlattr *userdata); static void dp_netdev_execute_actions(struct dp_netdev *dp, @@ -733,7 +732,7 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, if (netdev_is_pmd(netdev)) { dp->pmd_count++; - dp_netdev_set_pmd_threads(dp, NR_THREADS); + dp_netdev_set_pmd_threads(dp, NR_PMD_THREADS); dp_netdev_reload_pmd_threads(dp); } ovs_refcount_init(&port->ref_cnt); @@ -2088,17 +2087,18 @@ dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt, } if (OVS_UNLIKELY(!rules[i])) { + dp_netdev_count_packet(dp, DP_STAT_MISS, 1); + if (OVS_LIKELY(dp->handler_queues)) { uint32_t hash = miniflow_hash_5tuple(mfs[i], 0); struct ofpbuf *buf = &packets[i]->ofpbuf; - dp_netdev_output_userspace(dp, &buf, 1, hash % dp->n_handlers, + dp_netdev_output_userspace(dp, buf, hash % dp->n_handlers, DPIF_UC_MISS, mfs[i], NULL); - } else { - /* No upcall queue. Freeing the packet */ - dpif_packet_delete(packets[i]); } + + dpif_packet_delete(packets[i]); continue; } @@ -2161,6 +2161,7 @@ OVS_REQUIRES(q->mutex) if (userdata) { buf_size += NLA_ALIGN(userdata->nla_len); } + buf_size += ofpbuf_size(packet); ofpbuf_init(buf, buf_size); /* Put ODP flow. */ @@ -2175,39 +2176,37 @@ OVS_REQUIRES(q->mutex) NLA_ALIGN(userdata->nla_len)); } - upcall->packet = *packet; + /* We have to perform a copy of the packet, because we cannot send DPDK + * mbufs to a non pmd thread. When the upcall processing will be done + * in the pmd thread, this copy can be avoided */ + ofpbuf_set_data(&upcall->packet, ofpbuf_put(buf, ofpbuf_data(packet), + ofpbuf_size(packet))); + ofpbuf_set_size(&upcall->packet, ofpbuf_size(packet)); seq_change(q->seq); return 0; } else { - ofpbuf_delete(packet); return ENOBUFS; } - } static int -dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf **packets, - int cnt, int queue_no, int type, +dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet, + int queue_no, int type, const struct miniflow *key, const struct nlattr *userdata) { struct dp_netdev_queue *q; int error; - int i; fat_rwlock_rdlock(&dp->queue_rwlock); q = &dp->handler_queues[queue_no]; ovs_mutex_lock(&q->mutex); - for (i = 0; i < cnt; i++) { - struct ofpbuf *packet = packets[i]; - - error = dp_netdev_queue_userspace_packet(q, packet, type, key, - userdata); - if (error == ENOBUFS) { - dp_netdev_count_packet(dp, DP_STAT_LOST, 1); - } + error = dp_netdev_queue_userspace_packet(q, packet, type, key, + userdata); + if (error == ENOBUFS) { + dp_netdev_count_packet(dp, DP_STAT_LOST, 1); } ovs_mutex_unlock(&q->mutex); fat_rwlock_unlock(&dp->queue_rwlock); @@ -2252,19 +2251,20 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt, miniflow_initialize(&key.flow, key.buf); for (i = 0; i < cnt; i++) { - struct ofpbuf *packet, *userspace_packet; + struct ofpbuf *packet; packet = &packets[i]->ofpbuf; miniflow_extract(packet, md, &key.flow); - userspace_packet = may_steal ? packet : ofpbuf_clone(packet); - - dp_netdev_output_userspace(aux->dp, &userspace_packet, 1, + dp_netdev_output_userspace(aux->dp, packet, miniflow_hash_5tuple(&key.flow, 0) % aux->dp->n_handlers, DPIF_UC_ACTION, &key.flow, userdata); + if (may_steal) { + dpif_packet_delete(packets[i]); + } } break; } diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h index af4a969a2..0f42d7a94 100644 --- a/lib/dpif-netdev.h +++ b/lib/dpif-netdev.h @@ -40,6 +40,7 @@ static inline void dp_packet_pad(struct ofpbuf *b) } #define NR_QUEUE 1 +#define NR_PMD_THREADS 1 #ifdef __cplusplus } diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c index b925dd28b..62c9a0c76 100644 --- a/lib/netdev-dpdk.c +++ b/lib/netdev-dpdk.c @@ -138,6 +138,11 @@ static struct list dpdk_list OVS_GUARDED_BY(dpdk_mutex) static struct list dpdk_mp_list OVS_GUARDED_BY(dpdk_mutex) = LIST_INITIALIZER(&dpdk_mp_list); +/* This mutex must be used by non pmd threads when allocating or freeing + * mbufs through mempools. Since dpdk_queue_pkts() and dpdk_queue_flush() may + * use mempools, a non pmd thread should hold this mutex while calling them */ +struct ovs_mutex nonpmd_mempool_mutex = OVS_MUTEX_INITIALIZER; + struct dpdk_mp { struct rte_mempool *mp; int mtu; @@ -200,6 +205,8 @@ struct netdev_rxq_dpdk { int port_id; }; +static bool thread_is_pmd(void); + static int netdev_dpdk_construct(struct netdev *); static bool @@ -223,13 +230,14 @@ dpdk_rte_mzalloc(size_t sz) return ptr; } +/* XXX this function should be called only by pmd threads (or by non pmd + * threads holding the nonpmd_mempool_mutex) */ void free_dpdk_buf(struct dpif_packet *p) { - struct ofpbuf *buf = &p->ofpbuf; - struct rte_mbuf *pkt = (struct rte_mbuf *) buf->dpdk_buf; + struct rte_mbuf *pkt = (struct rte_mbuf *) p; - rte_mempool_put(pkt->pool, pkt); + rte_pktmbuf_free_seg(pkt); } static void @@ -617,10 +625,13 @@ dpdk_queue_flush__(struct netdev_dpdk *dev, int qid) nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count); if (OVS_UNLIKELY(nb_tx != txq->count)) { - /* free buffers if we couldn't transmit packets */ - rte_mempool_put_bulk(dev->dpdk_mp->mp, - (void **) &txq->burst_pkts[nb_tx], - (txq->count - nb_tx)); + /* free buffers, which we couldn't transmit, one at a time (each + * packet could come from a different mempool) */ + int i; + + for (i = nb_tx; i < txq->count; i++) { + rte_pktmbuf_free_seg(txq->burst_pkts[i]); + } } txq->count = 0; txq->tsc = rte_get_timer_cycles(); @@ -697,6 +708,7 @@ dpdk_queue_pkts(struct netdev_dpdk *dev, int qid, /* Tx function. Transmit packets indefinitely */ static void dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt) + OVS_NO_THREAD_SAFETY_ANALYSIS { struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); struct rte_mbuf *mbufs[cnt]; @@ -704,6 +716,13 @@ dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt) int newcnt = 0; int i; + /* If we are on a non pmd thread we have to use the mempool mutex, because + * every non pmd thread shares the same mempool cache */ + + if (!thread_is_pmd()) { + ovs_mutex_lock(&nonpmd_mempool_mutex); + } + for (i = 0; i < cnt; i++) { int size = ofpbuf_size(&pkts[i]->ofpbuf); @@ -739,6 +758,10 @@ dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt) dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, mbufs, newcnt); dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE); + + if (!thread_is_pmd()) { + ovs_mutex_unlock(&nonpmd_mempool_mutex); + } } static int @@ -1384,6 +1407,9 @@ dpdk_init(int argc, char **argv) argv[result] = argv[0]; } + /* We are called from the main thread here */ + thread_set_nonpmd(); + return result + 1; } @@ -1429,7 +1455,25 @@ pmd_thread_setaffinity_cpu(int cpu) VLOG_ERR("Thread affinity error %d",err); return err; } - RTE_PER_LCORE(_lcore_id) = cpu; + /* lcore_id 0 is reseved for use by non pmd threads. */ + RTE_PER_LCORE(_lcore_id) = cpu + 1; return 0; } + +void +thread_set_nonpmd(void) +{ + /* We cannot have RTE_MAX_LCORE pmd threads, because lcore_id 0 is reserved + * for non pmd threads */ + BUILD_ASSERT(NR_PMD_THREADS < RTE_MAX_LCORE); + /* We have to use 0 to allow non pmd threads to perform certain DPDK + * operations, like rte_eth_dev_configure(). */ + RTE_PER_LCORE(_lcore_id) = 0; +} + +static bool +thread_is_pmd(void) +{ + return rte_lcore_id() != 0; +} diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h index b021aa53e..e4ba6fc37 100644 --- a/lib/netdev-dpdk.h +++ b/lib/netdev-dpdk.h @@ -2,7 +2,6 @@ #define NETDEV_DPDK_H #include -#include "ofpbuf.h" struct dpif_packet; @@ -25,6 +24,7 @@ int dpdk_init(int argc, char **argv); void netdev_dpdk_register(void); void free_dpdk_buf(struct dpif_packet *); int pmd_thread_setaffinity_cpu(int cpu); +void thread_set_nonpmd(void); #else @@ -52,5 +52,11 @@ pmd_thread_setaffinity_cpu(int cpu OVS_UNUSED) return 0; } +static inline void +thread_set_nonpmd(void) +{ + /* Nothing */ +} + #endif /* DPDK_NETDEV */ #endif diff --git a/lib/ofpbuf.c b/lib/ofpbuf.c index cc0d741da..198bbf654 100644 --- a/lib/ofpbuf.c +++ b/lib/ofpbuf.c @@ -117,9 +117,6 @@ void ofpbuf_init_dpdk(struct ofpbuf *b, size_t allocated) { ofpbuf_init__(b, allocated, OFPBUF_DPDK); -#ifdef DPDK_NETDEV - b->dpdk_buf = b; -#endif } /* Initializes 'b' as an empty ofpbuf with an initial capacity of 'size' @@ -139,7 +136,6 @@ ofpbuf_uninit(struct ofpbuf *b) free(ofpbuf_base(b)); } else if (b->source == OFPBUF_DPDK) { #ifdef DPDK_NETDEV - ovs_assert(b != b->dpdk_buf); /* If this ofpbuf was allocated by DPDK it must have been * created as a dpif_packet */ free_dpdk_buf((struct dpif_packet*) b); diff --git a/lib/ofpbuf.h b/lib/ofpbuf.h index eed5ca8a9..adaf52673 100644 --- a/lib/ofpbuf.h +++ b/lib/ofpbuf.h @@ -59,7 +59,6 @@ enum OVS_PACKED_ENUM ofpbuf_source { struct ofpbuf { #ifdef DPDK_NETDEV struct rte_mbuf mbuf; /* DPDK mbuf */ - void *dpdk_buf; /* First byte of allocated DPDK buffer. */ #else void *base_; /* First byte of allocated space. */ void *data_; /* First byte actually in use. */ diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c index cbee582aa..fe6fb4372 100644 --- a/lib/ovs-thread.c +++ b/lib/ovs-thread.c @@ -25,6 +25,7 @@ #include #include "compiler.h" #include "hash.h" +#include "netdev-dpdk.h" #include "ovs-rcu.h" #include "poll-loop.h" #include "seq.h" @@ -326,6 +327,8 @@ ovsthread_wrapper(void *aux_) set_subprogram_name("%s%u", aux.name, id); ovsrcu_quiesce_end(); + thread_set_nonpmd(); + return aux.start(aux.arg); }