diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index cd72e6242..8c87c0532 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -372,6 +372,13 @@ struct dp_netdev_pmd_cycles { atomic_ullong n[PMD_N_CYCLES]; }; +/* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */ +struct rxq_poll { + struct dp_netdev_port *port; + struct netdev_rxq *rx; + struct ovs_list node; +}; + /* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate * the performance overhead of interrupt processing. Therefore netdev can * not implement rx-wait for these devices. dpif-netdev needs to poll @@ -430,6 +437,11 @@ struct dp_netdev_pmd_thread { int tx_qid; /* Queue id used by this pmd thread to * send packets on all netdevs */ + struct ovs_mutex poll_mutex; /* Mutex for poll_list. */ + /* List of rx queues to poll. */ + struct ovs_list poll_list OVS_GUARDED; + int poll_cnt; /* Number of elemints in poll_list. */ + /* Only a pmd thread can write on its own 'cycles' and 'stats'. * The main thread keeps 'stats_zero' and 'cycles_zero' as base * values and subtracts them from 'stats' and 'cycles' before @@ -469,7 +481,7 @@ static void dp_netdev_input(struct dp_netdev_pmd_thread *, struct dp_packet **, int cnt); static void dp_netdev_disable_upcall(struct dp_netdev *); -void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd); +static void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd); static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, int index, unsigned core_id, int numa_id); @@ -482,6 +494,11 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos); static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp); static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id); static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id); +static void +dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd, + struct dp_netdev_port *port, struct netdev_rxq *rx); +static struct dp_netdev_pmd_thread * +dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id); static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp); static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd); static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd); @@ -1025,18 +1042,6 @@ dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd) ovs_mutex_unlock(&pmd->cond_mutex); } -/* Causes all pmd threads to reload its tx/rx devices. - * Must be called after adding/removing ports. */ -static void -dp_netdev_reload_pmds(struct dp_netdev *dp) -{ - struct dp_netdev_pmd_thread *pmd; - - CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { - dp_netdev_reload_pmd__(pmd); - } -} - static uint32_t hash_port_no(odp_port_t port_no) { @@ -1128,8 +1133,26 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, cmap_insert(&dp->ports, &port->node, hash_port_no(port_no)); if (netdev_is_pmd(netdev)) { - dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev)); - dp_netdev_reload_pmds(dp); + int numa_id = netdev_get_numa_id(netdev); + struct dp_netdev_pmd_thread *pmd; + + /* Cannot create pmd threads for invalid numa node. */ + ovs_assert(ovs_numa_numa_id_is_valid(numa_id)); + + for (i = 0; i < netdev_n_rxq(netdev); i++) { + pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id); + if (!pmd) { + /* There is no pmd threads on this numa node. */ + dp_netdev_set_pmds_on_numa(dp, numa_id); + /* Assigning of rx queues done. */ + break; + } + + ovs_mutex_lock(&pmd->poll_mutex); + dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]); + ovs_mutex_unlock(&pmd->poll_mutex); + dp_netdev_reload_pmd__(pmd); + } } seq_change(dp->port_seq); @@ -1226,16 +1249,6 @@ port_ref(struct dp_netdev_port *port) } } -static bool -port_try_ref(struct dp_netdev_port *port) -{ - if (port) { - return ovs_refcount_try_ref_rcu(&port->ref_cnt); - } - - return false; -} - static void port_unref(struct dp_netdev_port *port) { @@ -1313,12 +1326,37 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port) if (netdev_is_pmd(port->netdev)) { int numa_id = netdev_get_numa_id(port->netdev); + /* PMD threads can not be on invalid numa node. */ + ovs_assert(ovs_numa_numa_id_is_valid(numa_id)); /* If there is no netdev on the numa node, deletes the pmd threads - * for that numa. Else, just reloads the queues. */ + * for that numa. Else, deletes the queues from polling lists. */ if (!has_pmd_port_for_numa(dp, numa_id)) { dp_netdev_del_pmds_on_numa(dp, numa_id); + } else { + struct dp_netdev_pmd_thread *pmd; + struct rxq_poll *poll, *next; + + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + if (pmd->numa_id == numa_id) { + bool found = false; + + ovs_mutex_lock(&pmd->poll_mutex); + LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) { + if (poll->port == port) { + found = true; + port_unref(poll->port); + list_remove(&poll->node); + pmd->poll_cnt--; + free(poll); + } + } + ovs_mutex_unlock(&pmd->poll_mutex); + if (found) { + dp_netdev_reload_pmd__(pmd); + } + } + } } - dp_netdev_reload_pmds(dp); } port_unref(port); @@ -2583,56 +2621,29 @@ dpif_netdev_wait(struct dpif *dpif) seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq); } -struct rxq_poll { - struct dp_netdev_port *port; - struct netdev_rxq *rx; -}; - static int pmd_load_queues(struct dp_netdev_pmd_thread *pmd, struct rxq_poll **ppoll_list, int poll_cnt) + OVS_REQUIRES(pmd->poll_mutex) { struct rxq_poll *poll_list = *ppoll_list; - struct dp_netdev_port *port; - int n_pmds_on_numa, index, i; + struct rxq_poll *poll; + int i; - /* Simple scheduler for netdev rx polling. */ for (i = 0; i < poll_cnt; i++) { port_unref(poll_list[i].port); } - poll_cnt = 0; - n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id); - index = 0; + poll_list = xrealloc(poll_list, pmd->poll_cnt * sizeof *poll_list); - CMAP_FOR_EACH (port, node, &pmd->dp->ports) { - /* Calls port_try_ref() to prevent the main thread - * from deleting the port. */ - if (port_try_ref(port)) { - if (netdev_is_pmd(port->netdev) - && netdev_get_numa_id(port->netdev) == pmd->numa_id) { - int i; - - for (i = 0; i < netdev_n_rxq(port->netdev); i++) { - if ((index % n_pmds_on_numa) == pmd->index) { - poll_list = xrealloc(poll_list, - sizeof *poll_list * (poll_cnt + 1)); - - port_ref(port); - poll_list[poll_cnt].port = port; - poll_list[poll_cnt].rx = port->rxq[i]; - poll_cnt++; - } - index++; - } - } - /* Unrefs the port_try_ref(). */ - port_unref(port); - } + i = 0; + LIST_FOR_EACH (poll, node, &pmd->poll_list) { + port_ref(poll->port); + poll_list[i++] = *poll; } *ppoll_list = poll_list; - return poll_cnt; + return pmd->poll_cnt; } static void * @@ -2653,11 +2664,15 @@ pmd_thread_main(void *f_) pmd_thread_setaffinity_cpu(pmd->core_id); reload: emc_cache_init(&pmd->flow_cache); + + ovs_mutex_lock(&pmd->poll_mutex); poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt); + ovs_mutex_unlock(&pmd->poll_mutex); /* List port/core affinity */ for (i = 0; i < poll_cnt; i++) { - VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id, netdev_get_name(poll_list[i].port->netdev)); + VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id, + netdev_get_name(poll_list[i].port->netdev)); } /* Signal here to make sure the pmd finishes @@ -2665,8 +2680,6 @@ reload: dp_netdev_pmd_reload_done(pmd); for (;;) { - int i; - for (i = 0; i < poll_cnt; i++) { dp_netdev_process_rxq_port(pmd, poll_list[i].port, poll_list[i].rx); } @@ -2695,7 +2708,7 @@ reload: } for (i = 0; i < poll_cnt; i++) { - port_unref(poll_list[i].port); + port_unref(poll_list[i].port); } dp_netdev_pmd_reload_done(pmd); @@ -2734,7 +2747,7 @@ dpif_netdev_enable_upcall(struct dpif *dpif) dp_netdev_enable_upcall(dp); } -void +static void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd) { ovs_mutex_lock(&pmd->cond_mutex); @@ -2827,6 +2840,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, pmd->core_id = core_id; pmd->tx_qid = core_id_to_qid(core_id); pmd->numa_id = numa_id; + pmd->poll_cnt = 0; ovs_refcount_init(&pmd->ref_cnt); latch_init(&pmd->exit_latch); @@ -2834,8 +2848,10 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, xpthread_cond_init(&pmd->cond, NULL); ovs_mutex_init(&pmd->cond_mutex); ovs_mutex_init(&pmd->flow_mutex); + ovs_mutex_init(&pmd->poll_mutex); dpcls_init(&pmd->cls); cmap_init(&pmd->flow_table); + list_init(&pmd->poll_list); /* init the 'flow_cache' since there is no * actual thread created for NON_PMD_CORE_ID. */ if (core_id == NON_PMD_CORE_ID) { @@ -2855,6 +2871,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd) latch_destroy(&pmd->exit_latch); xpthread_cond_destroy(&pmd->cond); ovs_mutex_destroy(&pmd->cond_mutex); + ovs_mutex_destroy(&pmd->poll_mutex); free(pmd); } @@ -2863,6 +2880,8 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd) static void dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd) { + struct rxq_poll *poll; + /* Uninit the 'flow_cache' since there is * no actual thread uninit it for NON_PMD_CORE_ID. */ if (pmd->core_id == NON_PMD_CORE_ID) { @@ -2873,6 +2892,13 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd) ovs_numa_unpin_core(pmd->core_id); xpthread_join(pmd->thread, NULL); } + + /* Unref all ports and free poll_list. */ + LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) { + port_unref(poll->port); + free(poll); + } + /* Purges the 'pmd''s flows after stopping the thread, but before * destroying the flows, so that the flow stats can be collected. */ if (dp->dp_purge_cb) { @@ -2906,6 +2932,42 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id) } } +/* Returns PMD thread from this numa node with fewer rx queues to poll. + * Returns NULL if there is no PMD threads on this numa node. + * Can be called safely only by main thread. */ +static struct dp_netdev_pmd_thread * +dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id) +{ + int min_cnt = -1; + struct dp_netdev_pmd_thread *pmd, *res = NULL; + + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + if (pmd->numa_id == numa_id + && (min_cnt > pmd->poll_cnt || res == NULL)) { + min_cnt = pmd->poll_cnt; + res = pmd; + } + } + + return res; +} + +/* Adds rx queue to poll_list of PMD thread. */ +static void +dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd, + struct dp_netdev_port *port, struct netdev_rxq *rx) + OVS_REQUIRES(pmd->poll_mutex) +{ + struct rxq_poll *poll = xmalloc(sizeof *poll); + + port_ref(port); + poll->port = port; + poll->rx = rx; + + list_push_back(&pmd->poll_list, &poll->node); + pmd->poll_cnt++; +} + /* Checks the numa node id of 'netdev' and starts pmd threads for * the numa node. */ static void @@ -2925,8 +2987,9 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id) * in which 'netdev' is on, do nothing. Else, creates the * pmd threads for the numa node. */ if (!n_pmds) { - int can_have, n_unpinned, i; + int can_have, n_unpinned, i, index = 0; struct dp_netdev_pmd_thread **pmds; + struct dp_netdev_port *port; n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id); if (!n_unpinned) { @@ -2944,13 +3007,23 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id) pmds[i] = xzalloc(sizeof **pmds); dp_netdev_configure_pmd(pmds[i], dp, i, core_id, numa_id); } - /* The pmd thread code needs to see all the others configured pmd - * threads on the same numa node. That's why we call - * 'dp_netdev_configure_pmd()' on all the threads and then we actually - * start them. */ + + /* Distributes rx queues of this numa node between new pmd threads. */ + CMAP_FOR_EACH (port, node, &dp->ports) { + if (netdev_is_pmd(port->netdev) + && netdev_get_numa_id(port->netdev) == numa_id) { + for (i = 0; i < netdev_n_rxq(port->netdev); i++) { + /* Make thread-safety analyser happy. */ + ovs_mutex_lock(&pmds[index]->poll_mutex); + dp_netdev_add_rxq_to_pmd(pmds[index], port, port->rxq[i]); + ovs_mutex_unlock(&pmds[index]->poll_mutex); + index = (index + 1) % can_have; + } + } + } + + /* Actual start of pmd threads. */ for (i = 0; i < can_have; i++) { - /* Each thread will distribute all devices rx-queues among - * themselves. */ pmds[i]->thread = ovs_thread_create("pmd", pmd_thread_main, pmds[i]); } free(pmds);