2
0
mirror of https://github.com/openvswitch/ovs synced 2025-08-31 14:25:26 +00:00

dpif-netdev: Add per-pmd flow-table/classifier.

This commit changes the per dpif-netdev datapath flow-table/
classifier to per pmd-thread.  As direct benefit, datapath
and flow statistics no longer need to be protected by mutex
or be declared as per-thread variable, since they are only
written by the owning pmd thread.

As side effects, the flow-dump output of userspace datapath
can contain overlapping flows.  To reduce confusion, the dump
from different pmd thread will be separated by a title line.
In addition, the flow operations via 'ovs-appctl dpctl/*'
are modified so that if the given flow in_port corresponds
to a dpdk interface, the operation will be conducted to all
pmd threads recv from that interface (expect for flow-get
which will always be applied to non-pmd threads).

Signed-off-by: Alex Wang <alexw@nicira.com>
Tested-by: Mark D. Gray <mark.d.gray@intel.com>
Acked-by: Pravin B Shelar <pshelar@nicira.com>
This commit is contained in:
Alex Wang
2014-10-12 18:18:47 -07:00
parent 9da2564e2b
commit 1c1e46ed84
7 changed files with 448 additions and 259 deletions

View File

@@ -31,6 +31,7 @@
#include "dirs.h"
#include "dpctl.h"
#include "dpif.h"
#include "dpif-netdev.h"
#include "dynamic-string.h"
#include "flow.h"
#include "match.h"
@@ -39,6 +40,7 @@
#include "odp-util.h"
#include "ofp-parse.h"
#include "ofpbuf.h"
#include "ovs-numa.h"
#include "packets.h"
#include "shash.h"
#include "simap.h"
@@ -729,7 +731,7 @@ dpctl_dump_flows(int argc, const char *argv[], struct dpctl_params *dpctl_p)
struct dpif_flow_dump_thread *flow_dump_thread;
struct dpif_flow_dump *flow_dump;
struct dpif_flow f;
int pmd_id = PMD_ID_NULL;
int error;
if (argc > 1 && !strncmp(argv[argc - 1], "filter=", 7)) {
@@ -792,6 +794,18 @@ dpctl_dump_flows(int argc, const char *argv[], struct dpctl_params *dpctl_p)
minimatch_destroy(&minimatch);
}
ds_clear(&ds);
/* If 'pmd_id' is specified, overlapping flows could be dumped from
* different pmd threads. So, separates dumps from different pmds
* by printing a title line. */
if (pmd_id != f.pmd_id) {
if (f.pmd_id == NON_PMD_CORE_ID) {
ds_put_format(&ds, "flow-dump from non-dpdk interfaces:\n");
} else {
ds_put_format(&ds, "flow-dump from pmd on cpu core: %d\n",
f.pmd_id);
}
pmd_id = f.pmd_id;
}
format_dpif_flow(&ds, &f, &portno_names, dpctl_p);
dpctl_print(dpctl_p, "%s\n", ds_cstr(&ds));
}
@@ -813,12 +827,43 @@ out_freefilter:
return error;
}
/* Extracts the in_port from the parsed keys, and returns the reference
* to the 'struct netdev *' of the dpif port. On error, returns NULL.
* Users must call 'netdev_close()' after finish using the returned
* reference. */
static struct netdev *
get_in_port_netdev_from_key(struct dpif *dpif, const struct ofpbuf *key)
{
const struct nlattr *in_port_nla;
struct netdev *dev = NULL;
in_port_nla = nl_attr_find(key, 0, OVS_KEY_ATTR_IN_PORT);
if (in_port_nla) {
struct dpif_port dpif_port;
odp_port_t port_no;
int error;
port_no = ODP_PORT_C(nl_attr_get_u32(in_port_nla));
error = dpif_port_query_by_number(dpif, port_no, &dpif_port);
if (error) {
goto out;
}
netdev_open(dpif_port.name, dpif_port.type, &dev);
dpif_port_destroy(&dpif_port);
}
out:
return dev;
}
static int
dpctl_put_flow(int argc, const char *argv[], enum dpif_flow_put_flags flags,
struct dpctl_params *dpctl_p)
{
const char *key_s = argv[argc - 2];
const char *actions_s = argv[argc - 1];
struct netdev *in_port_netdev = NULL;
struct dpif_flow_stats stats;
struct dpif_port dpif_port;
struct dpif_port_dump port_dump;
@@ -873,13 +918,40 @@ dpctl_put_flow(int argc, const char *argv[], enum dpif_flow_put_flags flags,
dpctl_error(dpctl_p, error, "parsing actions");
goto out_freeactions;
}
error = dpif_flow_put(dpif, flags,
ofpbuf_data(&key), ofpbuf_size(&key),
ofpbuf_size(&mask) == 0 ? NULL : ofpbuf_data(&mask),
ofpbuf_size(&mask),
ofpbuf_data(&actions), ofpbuf_size(&actions),
ufid_present ? &ufid : NULL,
dpctl_p->print_statistics ? &stats : NULL);
/* For DPDK interface, applies the operation to all pmd threads
* on the same numa node. */
in_port_netdev = get_in_port_netdev_from_key(dpif, &key);
if (in_port_netdev && netdev_is_pmd(in_port_netdev)) {
int numa_id;
numa_id = netdev_get_numa_id(in_port_netdev);
if (ovs_numa_numa_id_is_valid(numa_id)) {
struct ovs_numa_dump *dump = ovs_numa_dump_cores_on_numa(numa_id);
struct ovs_numa_info *iter;
FOR_EACH_CORE_ON_NUMA (iter, dump) {
if (ovs_numa_core_is_pinned(iter->core_id)) {
error = dpif_flow_put(dpif, flags,
ofpbuf_data(&key), ofpbuf_size(&key),
ofpbuf_size(&mask) == 0 ? NULL : ofpbuf_data(&mask),
ofpbuf_size(&mask), ofpbuf_data(&actions),
ofpbuf_size(&actions), ufid_present ? &ufid : NULL,
iter->core_id, dpctl_p->print_statistics ? &stats : NULL);
}
}
ovs_numa_dump_destroy(dump);
} else {
error = EINVAL;
}
} else {
error = dpif_flow_put(dpif, flags,
ofpbuf_data(&key), ofpbuf_size(&key),
ofpbuf_size(&mask) == 0 ? NULL : ofpbuf_data(&mask),
ofpbuf_size(&mask), ofpbuf_data(&actions),
ofpbuf_size(&actions), ufid_present ? &ufid : NULL,
PMD_ID_NULL, dpctl_p->print_statistics ? &stats : NULL);
}
if (error) {
dpctl_error(dpctl_p, error, "updating flow table");
goto out_freeactions;
@@ -900,6 +972,7 @@ out_freekeymask:
ofpbuf_uninit(&mask);
ofpbuf_uninit(&key);
dpif_close(dpif);
netdev_close(in_port_netdev);
return error;
}
@@ -964,7 +1037,9 @@ dpctl_get_flow(int argc, const char *argv[], struct dpctl_params *dpctl_p)
goto out;
}
error = dpif_flow_get(dpif, NULL, 0, &ufid, &buf, &flow);
/* Does not work for DPDK, since do not know which 'pmd' to apply the
* operation. So, just uses PMD_ID_NULL. */
error = dpif_flow_get(dpif, NULL, 0, &ufid, PMD_ID_NULL, &buf, &flow);
if (error) {
dpctl_error(dpctl_p, error, "getting flow");
goto out;
@@ -987,6 +1062,7 @@ static int
dpctl_del_flow(int argc, const char *argv[], struct dpctl_params *dpctl_p)
{
const char *key_s = argv[argc - 1];
struct netdev *in_port_netdev = NULL;
struct dpif_flow_stats stats;
struct dpif_port dpif_port;
struct dpif_port_dump port_dump;
@@ -1034,10 +1110,33 @@ dpctl_del_flow(int argc, const char *argv[], struct dpctl_params *dpctl_p)
goto out;
}
error = dpif_flow_del(dpif,
ofpbuf_data(&key), ofpbuf_size(&key),
ufid_present ? &ufid : NULL,
dpctl_p->print_statistics ? &stats : NULL);
/* For DPDK interface, applies the operation to all pmd threads
* on the same numa node. */
in_port_netdev = get_in_port_netdev_from_key(dpif, &key);
if (in_port_netdev && netdev_is_pmd(in_port_netdev)) {
int numa_id;
numa_id = netdev_get_numa_id(in_port_netdev);
if (ovs_numa_numa_id_is_valid(numa_id)) {
struct ovs_numa_dump *dump = ovs_numa_dump_cores_on_numa(numa_id);
struct ovs_numa_info *iter;
FOR_EACH_CORE_ON_NUMA (iter, dump) {
if (ovs_numa_core_is_pinned(iter->core_id)) {
error = dpif_flow_del(dpif, ofpbuf_data(&key),
ofpbuf_size(&key), ufid_present ? &ufid : NULL,
iter->core_id, dpctl_p->print_statistics ? &stats : NULL);
}
}
ovs_numa_dump_destroy(dump);
} else {
error = EINVAL;
}
} else {
error = dpif_flow_del(dpif, ofpbuf_data(&key), ofpbuf_size(&key),
ufid_present ? &ufid : NULL, PMD_ID_NULL,
dpctl_p->print_statistics ? &stats : NULL);
}
if (error) {
dpctl_error(dpctl_p, error, "deleting flow");
if (error == ENOENT && !ufid_present) {
@@ -1065,6 +1164,7 @@ out:
ofpbuf_uninit(&key);
simap_destroy(&port_names);
dpif_close(dpif);
netdev_close(in_port_netdev);
return error;
}

View File

@@ -177,7 +177,6 @@ static bool dpcls_lookup(const struct dpcls *cls,
*
* dp_netdev_mutex (global)
* port_mutex
* flow_mutex
*/
struct dp_netdev {
const struct dpif_class *const class;
@@ -186,20 +185,6 @@ struct dp_netdev {
struct ovs_refcount ref_cnt;
atomic_flag destroyed;
/* Flows.
*
* Writers of 'flow_table' must take the 'flow_mutex'. Corresponding
* changes to 'cls' must be made while still holding the 'flow_mutex'.
*/
struct ovs_mutex flow_mutex;
struct dpcls cls;
struct cmap flow_table OVS_GUARDED; /* Flow table. */
/* Statistics.
*
* ovsthread_stats is internally synchronized. */
struct ovsthread_stats stats; /* Contains 'struct dp_netdev_stats *'. */
/* Ports.
*
* Protected by RCU. Take the mutex to add or remove ports. */
@@ -241,15 +226,6 @@ enum dp_stat_type {
DP_N_STATS
};
/* Contained by struct dp_netdev's 'stats' member. */
struct dp_netdev_stats {
struct ovs_mutex mutex; /* Protects 'n'. */
/* Indexed by DP_STAT_*, protected by 'mutex'. */
unsigned long long int n[DP_N_STATS] OVS_GUARDED;
};
/* A port in a netdev-based datapath. */
struct dp_netdev_port {
struct cmap_node node; /* Node in dp_netdev's 'ports'. */
@@ -261,15 +237,22 @@ struct dp_netdev_port {
char *type; /* Port type as requested by user. */
};
/* A flow in dp_netdev's 'flow_table'.
/* Contained by struct dp_netdev_flow's 'stats' member. */
struct dp_netdev_flow_stats {
long long int used; /* Last used time, in monotonic msecs. */
long long int packet_count; /* Number of packets matched. */
long long int byte_count; /* Number of bytes matched. */
uint16_t tcp_flags; /* Bitwise-OR of seen tcp_flags values. */
};
/* A flow in 'dp_netdev_pmd_thread's 'flow_table'.
*
*
* Thread-safety
* =============
*
* Except near the beginning or ending of its lifespan, rule 'rule' belongs to
* its dp_netdev's classifier. The text below calls this classifier 'cls'.
* its pmd thread's classifier. The text below calls this classifier 'cls'.
*
* Motivation
* ----------
@@ -303,9 +286,12 @@ struct dp_netdev_flow {
bool dead;
/* Hash table index by unmasked flow. */
const struct cmap_node node; /* In owning dp_netdev's 'flow_table'. */
const struct cmap_node node; /* In owning dp_netdev_pmd_thread's */
/* 'flow_table'. */
const ovs_u128 ufid; /* Unique flow identifier. */
const struct flow flow; /* Unmasked flow that created this entry. */
const int pmd_id; /* The 'core_id' of pmd thread owning this */
/* flow. */
/* Number of references.
* The classifier owns one reference.
@@ -313,10 +299,8 @@ struct dp_netdev_flow {
* reference. */
struct ovs_refcount ref_cnt;
/* Statistics.
*
* Reading or writing these members requires 'mutex'. */
struct ovsthread_stats stats; /* Contains "struct dp_netdev_flow_stats". */
/* Statistics. */
struct dp_netdev_flow_stats stats;
/* Actions. */
OVSRCU_TYPE(struct dp_netdev_actions *) actions;
@@ -331,16 +315,6 @@ static bool dp_netdev_flow_ref(struct dp_netdev_flow *);
static int dpif_netdev_flow_from_nlattrs(const struct nlattr *, uint32_t,
struct flow *);
/* Contained by struct dp_netdev_flow's 'stats' member. */
struct dp_netdev_flow_stats {
struct ovs_mutex mutex; /* Guards all the other members. */
long long int used OVS_GUARDED; /* Last used time, in monotonic msecs. */
long long int packet_count OVS_GUARDED; /* Number of packets matched. */
long long int byte_count OVS_GUARDED; /* Number of bytes matched. */
uint16_t tcp_flags OVS_GUARDED; /* Bitwise-OR of seen tcp_flags values. */
};
/* A set of datapath actions within a "struct dp_netdev_flow".
*
*
@@ -361,20 +335,31 @@ struct dp_netdev_actions *dp_netdev_flow_get_actions(
const struct dp_netdev_flow *);
static void dp_netdev_actions_free(struct dp_netdev_actions *);
/* Contained by struct dp_netdev_pmd_thread's 'stats' member. */
struct dp_netdev_pmd_stats {
/* Indexed by DP_STAT_*. */
unsigned long long int n[DP_N_STATS];
};
/* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate
* the performance overhead of interrupt processing. Therefore netdev can
* not implement rx-wait for these devices. dpif-netdev needs to poll
* these device to check for recv buffer. pmd-thread does polling for
* devices assigned to itself thread.
* devices assigned to itself.
*
* DPDK used PMD for accessing NIC.
*
* Note, instance with cpu core id NON_PMD_CORE_ID will be reserved for
* I/O of all non-pmd threads. There will be no actual thread created
* for the instance.
**/
*
* Each struct has its own flow table and classifier. Packets received
* from managed ports are looked up in the corresponding pmd thread's
* flow table, and are executed with the found actions.
* */
struct dp_netdev_pmd_thread {
struct dp_netdev *dp;
struct ovs_refcount ref_cnt; /* Every reference must be refcount'ed. */
struct cmap_node node; /* In 'dp->poll_threads'. */
pthread_cond_t cond; /* For synchronizing pmd thread reload. */
@@ -385,6 +370,19 @@ struct dp_netdev_pmd_thread {
* need to be protected (e.g. by 'dp_netdev_mutex'). All other
* instances will only be accessed by its own pmd thread. */
struct emc_cache flow_cache;
/* Classifier and Flow-Table.
*
* Writers of 'flow_table' must take the 'flow_mutex'. Corresponding
* changes to 'cls' must be made while still holding the 'flow_mutex'.
*/
struct ovs_mutex flow_mutex;
struct dpcls cls;
struct cmap flow_table OVS_GUARDED; /* Flow table. */
/* Statistics. */
struct dp_netdev_pmd_stats stats;
struct latch exit_latch; /* For terminating the pmd thread. */
atomic_uint change_seq; /* For reloading pmd ports. */
pthread_t thread;
@@ -409,7 +407,6 @@ static int get_port_by_name(struct dp_netdev *dp, const char *devname,
struct dp_netdev_port **portp);
static void dp_netdev_free(struct dp_netdev *)
OVS_REQUIRES(dp_netdev_mutex);
static void dp_netdev_flow_flush(struct dp_netdev *);
static int do_add_port(struct dp_netdev *dp, const char *devname,
const char *type, odp_port_t port_no)
OVS_REQUIRES(dp->port_mutex);
@@ -430,13 +427,19 @@ void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev *dp, int index,
int core_id, int numa_id);
static void dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_set_nonpmd(struct dp_netdev *dp);
static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp,
int core_id);
static struct dp_netdev_pmd_thread *
dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id);
static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id);
static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp);
static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd);
static inline bool emc_entry_alive(struct emc_entry *ce);
static void emc_clear_entry(struct emc_entry *ce);
@@ -604,12 +607,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
ovs_refcount_init(&dp->ref_cnt);
atomic_flag_clear(&dp->destroyed);
ovs_mutex_init(&dp->flow_mutex);
dpcls_init(&dp->cls);
cmap_init(&dp->flow_table);
ovsthread_stats_init(&dp->stats);
ovs_mutex_init(&dp->port_mutex);
cmap_init(&dp->ports);
dp->port_seq = seq_create();
@@ -686,8 +683,6 @@ dp_netdev_free(struct dp_netdev *dp)
OVS_REQUIRES(dp_netdev_mutex)
{
struct dp_netdev_port *port;
struct dp_netdev_stats *bucket;
int i;
shash_find_and_delete(&dp_netdevs, dp->name);
@@ -696,22 +691,12 @@ dp_netdev_free(struct dp_netdev *dp)
ovs_mutex_destroy(&dp->non_pmd_mutex);
ovsthread_key_delete(dp->per_pmd_key);
dp_netdev_flow_flush(dp);
ovs_mutex_lock(&dp->port_mutex);
CMAP_FOR_EACH (port, node, &dp->ports) {
do_del_port(dp, port);
}
ovs_mutex_unlock(&dp->port_mutex);
OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
ovs_mutex_destroy(&bucket->mutex);
free_cacheline(bucket);
}
ovsthread_stats_destroy(&dp->stats);
dpcls_destroy(&dp->cls);
cmap_destroy(&dp->flow_table);
ovs_mutex_destroy(&dp->flow_mutex);
seq_destroy(dp->port_seq);
cmap_destroy(&dp->ports);
@@ -765,18 +750,14 @@ static int
dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_stats *bucket;
size_t i;
struct dp_netdev_pmd_thread *pmd;
stats->n_flows = cmap_count(&dp->flow_table);
stats->n_hit = stats->n_missed = stats->n_lost = 0;
OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
ovs_mutex_lock(&bucket->mutex);
stats->n_hit += bucket->n[DP_STAT_HIT];
stats->n_missed += bucket->n[DP_STAT_MISS];
stats->n_lost += bucket->n[DP_STAT_LOST];
ovs_mutex_unlock(&bucket->mutex);
stats->n_flows = stats->n_hit = stats->n_missed = stats->n_lost = 0;
CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
stats->n_flows += cmap_count(&pmd->flow_table);
stats->n_hit += pmd->stats.n[DP_STAT_HIT];
stats->n_missed += pmd->stats.n[DP_STAT_MISS];
stats->n_lost += pmd->stats.n[DP_STAT_LOST];
}
stats->n_masks = UINT32_MAX;
stats->n_mask_hit = UINT64_MAX;
@@ -1140,15 +1121,6 @@ dpif_netdev_port_query_by_name(const struct dpif *dpif, const char *devname,
static void
dp_netdev_flow_free(struct dp_netdev_flow *flow)
{
struct dp_netdev_flow_stats *bucket;
size_t i;
OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &flow->stats) {
ovs_mutex_destroy(&bucket->mutex);
free_cacheline(bucket);
}
ovsthread_stats_destroy(&flow->stats);
dp_netdev_actions_free(dp_netdev_flow_get_actions(flow));
free(flow);
}
@@ -1167,36 +1139,41 @@ dp_netdev_flow_hash(const ovs_u128 *ufid)
}
static void
dp_netdev_remove_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
OVS_REQUIRES(dp->flow_mutex)
dp_netdev_pmd_remove_flow(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev_flow *flow)
OVS_REQUIRES(pmd->flow_mutex)
{
struct cmap_node *node = CONST_CAST(struct cmap_node *, &flow->node);
dpcls_remove(&dp->cls, &flow->cr);
cmap_remove(&dp->flow_table, node, dp_netdev_flow_hash(&flow->ufid));
dpcls_remove(&pmd->cls, &flow->cr);
cmap_remove(&pmd->flow_table, node, dp_netdev_flow_hash(&flow->ufid));
flow->dead = true;
dp_netdev_flow_unref(flow);
}
static void
dp_netdev_flow_flush(struct dp_netdev *dp)
dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd)
{
struct dp_netdev_flow *netdev_flow;
ovs_mutex_lock(&dp->flow_mutex);
CMAP_FOR_EACH (netdev_flow, node, &dp->flow_table) {
dp_netdev_remove_flow(dp, netdev_flow);
ovs_mutex_lock(&pmd->flow_mutex);
CMAP_FOR_EACH (netdev_flow, node, &pmd->flow_table) {
dp_netdev_pmd_remove_flow(pmd, netdev_flow);
}
ovs_mutex_unlock(&dp->flow_mutex);
ovs_mutex_unlock(&pmd->flow_mutex);
}
static int
dpif_netdev_flow_flush(struct dpif *dpif)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_pmd_thread *pmd;
CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
dp_netdev_pmd_flow_flush(pmd);
}
dp_netdev_flow_flush(dp);
return 0;
}
@@ -1528,21 +1505,22 @@ emc_lookup(struct emc_cache *cache, const struct netdev_flow_key *key)
}
static struct dp_netdev_flow *
dp_netdev_lookup_flow(const struct dp_netdev *dp,
const struct netdev_flow_key *key)
dp_netdev_pmd_lookup_flow(const struct dp_netdev_pmd_thread *pmd,
const struct netdev_flow_key *key)
{
struct dp_netdev_flow *netdev_flow;
struct dpcls_rule *rule;
dpcls_lookup(&dp->cls, key, &rule, 1);
dpcls_lookup(&pmd->cls, key, &rule, 1);
netdev_flow = dp_netdev_flow_cast(rule);
return netdev_flow;
}
static struct dp_netdev_flow *
dp_netdev_find_flow(const struct dp_netdev *dp, const ovs_u128 *ufidp,
const struct nlattr *key, size_t key_len)
dp_netdev_pmd_find_flow(const struct dp_netdev_pmd_thread *pmd,
const ovs_u128 *ufidp, const struct nlattr *key,
size_t key_len)
{
struct dp_netdev_flow *netdev_flow;
struct flow flow;
@@ -1551,13 +1529,13 @@ dp_netdev_find_flow(const struct dp_netdev *dp, const ovs_u128 *ufidp,
/* If a UFID is not provided, determine one based on the key. */
if (!ufidp && key && key_len
&& !dpif_netdev_flow_from_nlattrs(key, key_len, &flow)) {
dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid);
dpif_flow_hash(pmd->dp->dpif, &flow, sizeof flow, &ufid);
ufidp = &ufid;
}
if (ufidp) {
CMAP_FOR_EACH_WITH_HASH (netdev_flow, node, dp_netdev_flow_hash(ufidp),
&dp->flow_table) {
&pmd->flow_table) {
if (ovs_u128_equal(&netdev_flow->ufid, ufidp)) {
return netdev_flow;
}
@@ -1571,18 +1549,10 @@ static void
get_dpif_flow_stats(const struct dp_netdev_flow *netdev_flow,
struct dpif_flow_stats *stats)
{
struct dp_netdev_flow_stats *bucket;
size_t i;
memset(stats, 0, sizeof *stats);
OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) {
ovs_mutex_lock(&bucket->mutex);
stats->n_packets += bucket->packet_count;
stats->n_bytes += bucket->byte_count;
stats->used = MAX(stats->used, bucket->used);
stats->tcp_flags |= bucket->tcp_flags;
ovs_mutex_unlock(&bucket->mutex);
}
stats->n_packets = netdev_flow->stats.packet_count;
stats->n_bytes = netdev_flow->stats.byte_count;
stats->used = netdev_flow->stats.used;
stats->tcp_flags = netdev_flow->stats.tcp_flags;
}
/* Converts to the dpif_flow format, using 'key_buf' and 'mask_buf' for
@@ -1626,6 +1596,7 @@ dp_netdev_flow_to_dpif_flow(const struct dp_netdev_flow *netdev_flow,
flow->ufid = netdev_flow->ufid;
flow->ufid_present = true;
flow->pmd_id = netdev_flow->pmd_id;
get_dpif_flow_stats(netdev_flow, &flow->stats);
}
@@ -1726,24 +1697,34 @@ dpif_netdev_flow_get(const struct dpif *dpif, const struct dpif_flow_get *get)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_flow *netdev_flow;
struct dp_netdev_pmd_thread *pmd;
int pmd_id = get->pmd_id == PMD_ID_NULL ? NON_PMD_CORE_ID : get->pmd_id;
int error = 0;
netdev_flow = dp_netdev_find_flow(dp, get->ufid, get->key, get->key_len);
pmd = dp_netdev_get_pmd(dp, pmd_id);
if (!pmd) {
return EINVAL;
}
netdev_flow = dp_netdev_pmd_find_flow(pmd, get->ufid, get->key,
get->key_len);
if (netdev_flow) {
dp_netdev_flow_to_dpif_flow(netdev_flow, get->buffer, get->buffer,
get->flow, false);
} else {
error = ENOENT;
}
dp_netdev_pmd_unref(pmd);
return error;
}
static struct dp_netdev_flow *
dp_netdev_flow_add(struct dp_netdev *dp, struct match *match,
const ovs_u128 *ufid,
dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
struct match *match, const ovs_u128 *ufid,
const struct nlattr *actions, size_t actions_len)
OVS_REQUIRES(dp->flow_mutex)
OVS_REQUIRES(pmd->flow_mutex)
{
struct dp_netdev_flow *flow;
struct netdev_flow_key mask;
@@ -1754,18 +1735,19 @@ dp_netdev_flow_add(struct dp_netdev *dp, struct match *match,
/* Do not allocate extra space. */
flow = xmalloc(sizeof *flow - sizeof flow->cr.flow.mf + mask.len);
memset(&flow->stats, 0, sizeof flow->stats);
flow->dead = false;
*CONST_CAST(int *, &flow->pmd_id) = pmd->core_id;
*CONST_CAST(struct flow *, &flow->flow) = match->flow;
*CONST_CAST(ovs_u128 *, &flow->ufid) = *ufid;
ovs_refcount_init(&flow->ref_cnt);
ovsthread_stats_init(&flow->stats);
ovsrcu_set(&flow->actions, dp_netdev_actions_create(actions, actions_len));
cmap_insert(&dp->flow_table,
cmap_insert(&pmd->flow_table,
CONST_CAST(struct cmap_node *, &flow->node),
dp_netdev_flow_hash(&flow->ufid));
netdev_flow_key_init_masked(&flow->cr.flow, &match->flow, &mask);
dpcls_insert(&dp->cls, &flow->cr, &mask);
dpcls_insert(&pmd->cls, &flow->cr, &mask);
if (OVS_UNLIKELY(VLOG_IS_DBG_ENABLED())) {
struct match match;
@@ -1789,30 +1771,16 @@ dp_netdev_flow_add(struct dp_netdev *dp, struct match *match,
return flow;
}
static void
clear_stats(struct dp_netdev_flow *netdev_flow)
{
struct dp_netdev_flow_stats *bucket;
size_t i;
OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) {
ovs_mutex_lock(&bucket->mutex);
bucket->used = 0;
bucket->packet_count = 0;
bucket->byte_count = 0;
bucket->tcp_flags = 0;
ovs_mutex_unlock(&bucket->mutex);
}
}
static int
dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_flow *netdev_flow;
struct netdev_flow_key key;
struct dp_netdev_pmd_thread *pmd;
struct match match;
ovs_u128 ufid;
int pmd_id = put->pmd_id == PMD_ID_NULL ? NON_PMD_CORE_ID : put->pmd_id;
int error;
error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &match.flow);
@@ -1826,6 +1794,11 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
return error;
}
pmd = dp_netdev_get_pmd(dp, pmd_id);
if (!pmd) {
return EINVAL;
}
/* Must produce a netdev_flow_key for lookup.
* This interface is no longer performance critical, since it is not used
* for upcall processing any more. */
@@ -1837,15 +1810,15 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
dpif_flow_hash(dpif, &match.flow, sizeof match.flow, &ufid);
}
ovs_mutex_lock(&dp->flow_mutex);
netdev_flow = dp_netdev_lookup_flow(dp, &key);
ovs_mutex_lock(&pmd->flow_mutex);
netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &key);
if (!netdev_flow) {
if (put->flags & DPIF_FP_CREATE) {
if (cmap_count(&dp->flow_table) < MAX_FLOWS) {
if (cmap_count(&pmd->flow_table) < MAX_FLOWS) {
if (put->stats) {
memset(put->stats, 0, sizeof *put->stats);
}
dp_netdev_flow_add(dp, &match, &ufid, put->actions,
dp_netdev_flow_add(pmd, &match, &ufid, put->actions,
put->actions_len);
error = 0;
} else {
@@ -1870,7 +1843,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
get_dpif_flow_stats(netdev_flow, put->stats);
}
if (put->flags & DPIF_FP_ZERO_STATS) {
clear_stats(netdev_flow);
memset(&netdev_flow->stats, 0, sizeof netdev_flow->stats);
}
ovsrcu_postpone(dp_netdev_actions_free, old_actions);
@@ -1881,7 +1854,8 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
error = EINVAL;
}
}
ovs_mutex_unlock(&dp->flow_mutex);
ovs_mutex_unlock(&pmd->flow_mutex);
dp_netdev_pmd_unref(pmd);
return error;
}
@@ -1891,26 +1865,37 @@ dpif_netdev_flow_del(struct dpif *dpif, const struct dpif_flow_del *del)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_flow *netdev_flow;
struct dp_netdev_pmd_thread *pmd;
int pmd_id = del->pmd_id == PMD_ID_NULL ? NON_PMD_CORE_ID : del->pmd_id;
int error = 0;
ovs_mutex_lock(&dp->flow_mutex);
netdev_flow = dp_netdev_find_flow(dp, del->ufid, del->key, del->key_len);
pmd = dp_netdev_get_pmd(dp, pmd_id);
if (!pmd) {
return EINVAL;
}
ovs_mutex_lock(&pmd->flow_mutex);
netdev_flow = dp_netdev_pmd_find_flow(pmd, del->ufid, del->key,
del->key_len);
if (netdev_flow) {
if (del->stats) {
get_dpif_flow_stats(netdev_flow, del->stats);
}
dp_netdev_remove_flow(dp, netdev_flow);
dp_netdev_pmd_remove_flow(pmd, netdev_flow);
} else {
error = ENOENT;
}
ovs_mutex_unlock(&dp->flow_mutex);
ovs_mutex_unlock(&pmd->flow_mutex);
dp_netdev_pmd_unref(pmd);
return error;
}
struct dpif_netdev_flow_dump {
struct dpif_flow_dump up;
struct cmap_position pos;
struct cmap_position poll_thread_pos;
struct cmap_position flow_pos;
struct dp_netdev_pmd_thread *cur_pmd;
int status;
struct ovs_mutex mutex;
};
@@ -1926,10 +1911,8 @@ dpif_netdev_flow_dump_create(const struct dpif *dpif_, bool terse)
{
struct dpif_netdev_flow_dump *dump;
dump = xmalloc(sizeof *dump);
dump = xzalloc(sizeof *dump);
dpif_flow_dump_init(&dump->up, dpif_);
memset(&dump->pos, 0, sizeof dump->pos);
dump->status = 0;
dump->up.terse = terse;
ovs_mutex_init(&dump->mutex);
@@ -1987,26 +1970,58 @@ dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_,
struct dpif_netdev_flow_dump_thread *thread
= dpif_netdev_flow_dump_thread_cast(thread_);
struct dpif_netdev_flow_dump *dump = thread->dump;
struct dpif_netdev *dpif = dpif_netdev_cast(thread->up.dpif);
struct dp_netdev_flow *netdev_flows[FLOW_DUMP_MAX_BATCH];
struct dp_netdev *dp = get_dp_netdev(&dpif->dpif);
int n_flows = 0;
int i;
ovs_mutex_lock(&dump->mutex);
if (!dump->status) {
for (n_flows = 0; n_flows < MIN(max_flows, FLOW_DUMP_MAX_BATCH);
n_flows++) {
struct cmap_node *node;
struct dpif_netdev *dpif = dpif_netdev_cast(thread->up.dpif);
struct dp_netdev *dp = get_dp_netdev(&dpif->dpif);
struct dp_netdev_pmd_thread *pmd = dump->cur_pmd;
int flow_limit = MIN(max_flows, FLOW_DUMP_MAX_BATCH);
/* First call to dump_next(), extracts the first pmd thread.
* If there is no pmd thread, returns immediately. */
if (!pmd) {
pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos);
if (!pmd) {
ovs_mutex_unlock(&dump->mutex);
return n_flows;
node = cmap_next_position(&dp->flow_table, &dump->pos);
if (!node) {
dump->status = EOF;
break;
}
netdev_flows[n_flows] = CONTAINER_OF(node, struct dp_netdev_flow,
node);
}
do {
for (n_flows = 0; n_flows < flow_limit; n_flows++) {
struct cmap_node *node;
node = cmap_next_position(&pmd->flow_table, &dump->flow_pos);
if (!node) {
break;
}
netdev_flows[n_flows] = CONTAINER_OF(node,
struct dp_netdev_flow,
node);
}
/* When finishing dumping the current pmd thread, moves to
* the next. */
if (n_flows < flow_limit) {
memset(&dump->flow_pos, 0, sizeof dump->flow_pos);
dp_netdev_pmd_unref(pmd);
pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos);
if (!pmd) {
dump->status = EOF;
break;
}
}
/* Keeps the reference to next caller. */
dump->cur_pmd = pmd;
/* If the current dump is empty, do not exit the loop, since the
* remaining pmds could have flows to be dumped. Just dumps again
* on the new 'pmd'. */
} while (!n_flows);
}
ovs_mutex_unlock(&dump->mutex);
@@ -2057,9 +2072,11 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
ovs_mutex_lock(&dp->non_pmd_mutex);
ovs_mutex_lock(&dp->port_mutex);
}
dp_netdev_execute_actions(pmd, &pp, 1, false, execute->actions,
execute->actions_len);
if (pmd->core_id == NON_PMD_CORE_ID) {
dp_netdev_pmd_unref(pmd);
ovs_mutex_unlock(&dp->port_mutex);
ovs_mutex_unlock(&dp->non_pmd_mutex);
}
@@ -2257,6 +2274,8 @@ dpif_netdev_run(struct dpif *dpif)
}
}
ovs_mutex_unlock(&dp->non_pmd_mutex);
dp_netdev_pmd_unref(non_pmd);
tnl_arp_cache_run();
new_tnl_seq = seq_read(tnl_conf_seq);
@@ -2440,7 +2459,10 @@ dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd)
ovs_mutex_unlock(&pmd->cond_mutex);
}
/* Returns the pointer to the dp_netdev_pmd_thread for non-pmd threads. */
/* Finds and refs the dp_netdev_pmd_thread on core 'core_id'. Returns
* the pointer if succeeds, otherwise, NULL.
*
* Caller must unrefs the returned reference. */
static struct dp_netdev_pmd_thread *
dp_netdev_get_pmd(struct dp_netdev *dp, int core_id)
{
@@ -2448,10 +2470,12 @@ dp_netdev_get_pmd(struct dp_netdev *dp, int core_id)
const struct cmap_node *pnode;
pnode = cmap_find(&dp->poll_threads, hash_int(core_id, 0));
ovs_assert(pnode);
if (!pnode) {
return NULL;
}
pmd = CONTAINER_OF(pnode, struct dp_netdev_pmd_thread, node);
return pmd;
return dp_netdev_pmd_try_ref(pmd) ? pmd : NULL;
}
/* Sets the 'struct dp_netdev_pmd_thread' for non-pmd threads. */
@@ -2465,6 +2489,41 @@ dp_netdev_set_nonpmd(struct dp_netdev *dp)
OVS_NUMA_UNSPEC);
}
/* Caller must have valid pointer to 'pmd'. */
static bool
dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd)
{
return ovs_refcount_try_ref_rcu(&pmd->ref_cnt);
}
static void
dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd)
{
if (pmd && ovs_refcount_unref(&pmd->ref_cnt) == 1) {
ovsrcu_postpone(dp_netdev_destroy_pmd, pmd);
}
}
/* Given cmap position 'pos', tries to ref the next node. If try_ref()
* fails, keeps checking for next node until reaching the end of cmap.
*
* Caller must unrefs the returned reference. */
static struct dp_netdev_pmd_thread *
dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos)
{
struct dp_netdev_pmd_thread *next;
do {
struct cmap_node *node;
node = cmap_next_position(&dp->poll_threads, pos);
next = node ? CONTAINER_OF(node, struct dp_netdev_pmd_thread, node)
: NULL;
} while (next && !dp_netdev_pmd_try_ref(next));
return next;
}
/* Configures the 'pmd' based on the input argument. */
static void
dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
@@ -2474,10 +2533,15 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
pmd->index = index;
pmd->core_id = core_id;
pmd->numa_id = numa_id;
ovs_refcount_init(&pmd->ref_cnt);
latch_init(&pmd->exit_latch);
atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ);
xpthread_cond_init(&pmd->cond, NULL);
ovs_mutex_init(&pmd->cond_mutex);
ovs_mutex_init(&pmd->flow_mutex);
dpcls_init(&pmd->cls);
cmap_init(&pmd->flow_table);
/* init the 'flow_cache' since there is no
* actual thread created for NON_PMD_CORE_ID. */
if (core_id == NON_PMD_CORE_ID) {
@@ -2487,13 +2551,26 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
hash_int(core_id, 0));
}
/* Stops the pmd thread, removes it from the 'dp->poll_threads'
* and destroys the struct. */
static void
dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
{
dp_netdev_pmd_flow_flush(pmd);
dpcls_destroy(&pmd->cls);
cmap_destroy(&pmd->flow_table);
ovs_mutex_destroy(&pmd->flow_mutex);
latch_destroy(&pmd->exit_latch);
xpthread_cond_destroy(&pmd->cond);
ovs_mutex_destroy(&pmd->cond_mutex);
free(pmd);
}
/* Stops the pmd thread, removes it from the 'dp->poll_threads',
* and unrefs the struct. */
static void
dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd)
{
/* Uninit the 'flow_cache' since there is
* no actual thread uninit it. */
* no actual thread uninit it for NON_PMD_CORE_ID. */
if (pmd->core_id == NON_PMD_CORE_ID) {
emc_cache_uninit(&pmd->flow_cache);
} else {
@@ -2503,10 +2580,7 @@ dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd)
xpthread_join(pmd->thread, NULL);
}
cmap_remove(&pmd->dp->poll_threads, &pmd->node, hash_int(pmd->core_id, 0));
latch_destroy(&pmd->exit_latch);
xpthread_cond_destroy(&pmd->cond);
ovs_mutex_destroy(&pmd->cond_mutex);
free(pmd);
dp_netdev_pmd_unref(pmd);
}
/* Destroys all pmd threads. */
@@ -2578,14 +2652,6 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
}
static void *
dp_netdev_flow_stats_new_cb(void)
{
struct dp_netdev_flow_stats *bucket = xzalloc_cacheline(sizeof *bucket);
ovs_mutex_init(&bucket->mutex);
return bucket;
}
/* Called after pmd threads config change. Restarts pmd threads with
* new configuration. */
static void
@@ -2609,53 +2675,35 @@ dpif_netdev_get_datapath_version(void)
}
static void
dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
int cnt, int size,
dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow, int cnt, int size,
uint16_t tcp_flags)
{
long long int now = time_msec();
struct dp_netdev_flow_stats *bucket;
bucket = ovsthread_stats_bucket_get(&netdev_flow->stats,
dp_netdev_flow_stats_new_cb);
ovs_mutex_lock(&bucket->mutex);
bucket->used = MAX(now, bucket->used);
bucket->packet_count += cnt;
bucket->byte_count += size;
bucket->tcp_flags |= tcp_flags;
ovs_mutex_unlock(&bucket->mutex);
}
static void *
dp_netdev_stats_new_cb(void)
{
struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket);
ovs_mutex_init(&bucket->mutex);
return bucket;
netdev_flow->stats.used = MAX(now, netdev_flow->stats.used);
netdev_flow->stats.packet_count += cnt;
netdev_flow->stats.byte_count += size;
netdev_flow->stats.tcp_flags |= tcp_flags;
}
static void
dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type, int cnt)
dp_netdev_count_packet(struct dp_netdev_pmd_thread *pmd,
enum dp_stat_type type, int cnt)
{
struct dp_netdev_stats *bucket;
bucket = ovsthread_stats_bucket_get(&dp->stats, dp_netdev_stats_new_cb);
ovs_mutex_lock(&bucket->mutex);
bucket->n[type] += cnt;
ovs_mutex_unlock(&bucket->mutex);
pmd->stats.n[type] += cnt;
}
static int
dp_netdev_upcall(struct dp_netdev *dp, struct dpif_packet *packet_,
dp_netdev_upcall(struct dp_netdev_pmd_thread *pmd, struct dpif_packet *packet_,
struct flow *flow, struct flow_wildcards *wc, ovs_u128 *ufid,
enum dpif_upcall_type type, const struct nlattr *userdata,
struct ofpbuf *actions, struct ofpbuf *put_actions)
{
struct dp_netdev *dp = pmd->dp;
struct ofpbuf *packet = &packet_->ofpbuf;
if (type == DPIF_UC_MISS) {
dp_netdev_count_packet(dp, DP_STAT_MISS, 1);
dp_netdev_count_packet(pmd, DP_STAT_MISS, 1);
}
if (OVS_UNLIKELY(!dp->upcall_cb)) {
@@ -2684,8 +2732,8 @@ dp_netdev_upcall(struct dp_netdev *dp, struct dpif_packet *packet_,
ds_destroy(&ds);
}
return dp->upcall_cb(packet, flow, ufid, type, userdata, actions, wc,
put_actions, dp->upcall_aux);
return dp->upcall_cb(packet, flow, ufid, pmd->core_id, type, userdata,
actions, wc, put_actions, dp->upcall_aux);
}
static inline uint32_t
@@ -2746,7 +2794,7 @@ packet_batch_execute(struct packet_batch *batch,
dp_netdev_execute_actions(pmd, batch->packets, batch->packet_count, true,
actions->actions, actions->size);
dp_netdev_count_packet(pmd->dp, DP_STAT_HIT, batch->packet_count);
dp_netdev_count_packet(pmd, DP_STAT_HIT, batch->packet_count);
}
static inline bool
@@ -2865,7 +2913,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
/* Key length is needed in all the cases, hash computed on demand. */
keys[i].len = netdev_flow_key_size(count_1bits(keys[i].mf.map));
}
any_miss = !dpcls_lookup(&dp->cls, keys, rules, cnt);
any_miss = !dpcls_lookup(&pmd->cls, keys, rules, cnt);
if (OVS_UNLIKELY(any_miss) && !fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
uint64_t actions_stub[512 / 8], slow_stub[512 / 8];
struct ofpbuf actions, put_actions;
@@ -2887,7 +2935,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
/* It's possible that an earlier slow path execution installed
* a rule covering this flow. In this case, it's a lot cheaper
* to catch it here than execute a miss. */
netdev_flow = dp_netdev_lookup_flow(dp, &keys[i]);
netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &keys[i]);
if (netdev_flow) {
rules[i] = &netdev_flow->cr;
continue;
@@ -2899,7 +2947,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
ofpbuf_clear(&put_actions);
dpif_flow_hash(dp->dpif, &match.flow, sizeof match.flow, &ufid);
error = dp_netdev_upcall(dp, packets[i], &match.flow, &match.wc,
error = dp_netdev_upcall(pmd, packets[i], &match.flow, &match.wc,
&ufid, DPIF_UC_MISS, NULL, &actions,
&put_actions);
if (OVS_UNLIKELY(error && error != ENOSPC)) {
@@ -2924,14 +2972,14 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
* mutex lock outside the loop, but that's an awful long time
* to be locking everyone out of making flow installs. If we
* move to a per-core classifier, it would be reasonable. */
ovs_mutex_lock(&dp->flow_mutex);
netdev_flow = dp_netdev_lookup_flow(dp, &keys[i]);
ovs_mutex_lock(&pmd->flow_mutex);
netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &keys[i]);
if (OVS_LIKELY(!netdev_flow)) {
netdev_flow = dp_netdev_flow_add(dp, &match, &ufid,
netdev_flow = dp_netdev_flow_add(pmd, &match, &ufid,
ofpbuf_data(add_actions),
ofpbuf_size(add_actions));
}
ovs_mutex_unlock(&dp->flow_mutex);
ovs_mutex_unlock(&pmd->flow_mutex);
emc_insert(flow_cache, &keys[i], netdev_flow);
}
@@ -2950,7 +2998,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
}
}
dp_netdev_count_packet(dp, DP_STAT_LOST, dropped_cnt);
dp_netdev_count_packet(pmd, DP_STAT_LOST, dropped_cnt);
}
n_batches = 0;
@@ -3141,7 +3189,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
flow_extract(&packets[i]->ofpbuf, &packets[i]->md, &flow);
dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid);
error = dp_netdev_upcall(dp, packets[i], &flow, NULL, &ufid,
error = dp_netdev_upcall(pmd, packets[i], &flow, NULL, &ufid,
DPIF_UC_ACTION, userdata,&actions,
NULL);
if (!error || error == ENOSPC) {

View File

@@ -1430,6 +1430,7 @@ dpif_netlink_flow_to_dpif_flow(struct dpif *dpif, struct dpif_flow *dpif_flow,
dpif_flow->ufid_present = datapath_flow->ufid_present;
if (datapath_flow->ufid_present) {
dpif_flow->ufid = datapath_flow->ufid;
dpif_flow->pmd_id = PMD_ID_NULL;
} else {
ovs_assert(datapath_flow->key && datapath_flow->key_len);
dpif_flow_hash(dpif, datapath_flow->key, datapath_flow->key_len,

View File

@@ -878,7 +878,7 @@ dpif_probe_feature(struct dpif *dpif, const char *name,
* previous run are still present in the datapath. */
error = dpif_flow_put(dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY | DPIF_FP_PROBE,
ofpbuf_data(key), ofpbuf_size(key), NULL, 0, NULL, 0,
ufid, NULL);
ufid, PMD_ID_NULL, NULL);
if (error) {
if (error != EINVAL) {
VLOG_WARN("%s: %s flow probe failed (%s)",
@@ -889,14 +889,14 @@ dpif_probe_feature(struct dpif *dpif, const char *name,
ofpbuf_use_stack(&reply, &stub, sizeof stub);
error = dpif_flow_get(dpif, ofpbuf_data(key), ofpbuf_size(key), ufid,
&reply, &flow);
PMD_ID_NULL, &reply, &flow);
if (!error
&& (!ufid || (flow.ufid_present && ovs_u128_equal(ufid, &flow.ufid)))) {
enable_feature = true;
}
error = dpif_flow_del(dpif, ofpbuf_data(key), ofpbuf_size(key), ufid,
NULL);
PMD_ID_NULL, NULL);
if (error) {
VLOG_WARN("%s: failed to delete %s feature probe flow",
dpif_name(dpif), name);
@@ -909,7 +909,7 @@ dpif_probe_feature(struct dpif *dpif, const char *name,
int
dpif_flow_get(struct dpif *dpif,
const struct nlattr *key, size_t key_len, const ovs_u128 *ufid,
struct ofpbuf *buf, struct dpif_flow *flow)
const int pmd_id, struct ofpbuf *buf, struct dpif_flow *flow)
{
struct dpif_op *opp;
struct dpif_op op;
@@ -918,6 +918,7 @@ dpif_flow_get(struct dpif *dpif,
op.u.flow_get.key = key;
op.u.flow_get.key_len = key_len;
op.u.flow_get.ufid = ufid;
op.u.flow_get.pmd_id = pmd_id;
op.u.flow_get.buffer = buf;
memset(flow, 0, sizeof *flow);
@@ -937,7 +938,8 @@ dpif_flow_put(struct dpif *dpif, enum dpif_flow_put_flags flags,
const struct nlattr *key, size_t key_len,
const struct nlattr *mask, size_t mask_len,
const struct nlattr *actions, size_t actions_len,
const ovs_u128 *ufid, struct dpif_flow_stats *stats)
const ovs_u128 *ufid, const int pmd_id,
struct dpif_flow_stats *stats)
{
struct dpif_op *opp;
struct dpif_op op;
@@ -951,6 +953,7 @@ dpif_flow_put(struct dpif *dpif, enum dpif_flow_put_flags flags,
op.u.flow_put.actions = actions;
op.u.flow_put.actions_len = actions_len;
op.u.flow_put.ufid = ufid;
op.u.flow_put.pmd_id = pmd_id;
op.u.flow_put.stats = stats;
opp = &op;
@@ -963,7 +966,7 @@ dpif_flow_put(struct dpif *dpif, enum dpif_flow_put_flags flags,
int
dpif_flow_del(struct dpif *dpif,
const struct nlattr *key, size_t key_len, const ovs_u128 *ufid,
struct dpif_flow_stats *stats)
const int pmd_id, struct dpif_flow_stats *stats)
{
struct dpif_op *opp;
struct dpif_op op;
@@ -972,6 +975,7 @@ dpif_flow_del(struct dpif *dpif,
op.u.flow_del.key = key;
op.u.flow_del.key_len = key_len;
op.u.flow_del.ufid = ufid;
op.u.flow_del.pmd_id = pmd_id;
op.u.flow_del.stats = stats;
op.u.flow_del.terse = false;

View File

@@ -391,6 +391,7 @@
#include "netdev.h"
#include "ofpbuf.h"
#include "openflow/openflow.h"
#include "ovs-numa.h"
#include "packets.h"
#include "util.h"
@@ -524,14 +525,15 @@ int dpif_flow_put(struct dpif *, enum dpif_flow_put_flags,
const struct nlattr *key, size_t key_len,
const struct nlattr *mask, size_t mask_len,
const struct nlattr *actions, size_t actions_len,
const ovs_u128 *ufid, struct dpif_flow_stats *);
const ovs_u128 *ufid, const int pmd_id,
struct dpif_flow_stats *);
int dpif_flow_del(struct dpif *,
const struct nlattr *key, size_t key_len,
const ovs_u128 *ufid, struct dpif_flow_stats *);
const ovs_u128 *ufid, const int pmd_id,
struct dpif_flow_stats *);
int dpif_flow_get(struct dpif *,
const struct nlattr *key, size_t key_len,
const ovs_u128 *ufid,
const ovs_u128 *ufid, const int pmd_id,
struct ofpbuf *, struct dpif_flow *);
/* Flow dumping interface
@@ -569,6 +571,8 @@ struct dpif_flow_dump_thread *dpif_flow_dump_thread_create(
struct dpif_flow_dump *);
void dpif_flow_dump_thread_destroy(struct dpif_flow_dump_thread *);
#define PMD_ID_NULL OVS_CORE_UNSPEC
/* A datapath flow as dumped by dpif_flow_dump_next(). */
struct dpif_flow {
const struct nlattr *key; /* Flow key, as OVS_KEY_ATTR_* attrs. */
@@ -579,6 +583,7 @@ struct dpif_flow {
size_t actions_len; /* 'actions' length in bytes. */
ovs_u128 ufid; /* Unique flow identifier. */
bool ufid_present; /* True if 'ufid' was provided by datapath.*/
int pmd_id; /* Datapath poll mode dirver id. */
struct dpif_flow_stats stats; /* Flow statistics. */
};
int dpif_flow_dump_next(struct dpif_flow_dump_thread *,
@@ -620,6 +625,10 @@ enum dpif_op_type {
*
* If the operation succeeds, then 'stats', if nonnull, will be set to the
* flow's statistics before the update.
*
* - If the datapath implements multiple pmd thread with its own flow
* table, 'pmd_id' should be used to specify the particular polling
* thread for the operation.
*/
struct dpif_flow_put {
/* Input. */
@@ -631,6 +640,7 @@ struct dpif_flow_put {
const struct nlattr *actions; /* Actions to perform on flow. */
size_t actions_len; /* Length of 'actions' in bytes. */
const ovs_u128 *ufid; /* Optional unique flow identifier. */
int pmd_id; /* Datapath poll mode driver id. */
/* Output. */
struct dpif_flow_stats *stats; /* Optional flow statistics. */
@@ -648,6 +658,10 @@ struct dpif_flow_put {
* Callers should always provide the 'key' to improve dpif logging in the event
* of errors or unexpected behaviour.
*
* If the datapath implements multiple polling thread with its own flow table,
* 'pmd_id' should be used to specify the particular polling thread for the
* operation.
*
* If the operation succeeds, then 'stats', if nonnull, will be set to the
* flow's statistics before its deletion. */
struct dpif_flow_del {
@@ -657,6 +671,7 @@ struct dpif_flow_del {
const ovs_u128 *ufid; /* Unique identifier of flow to delete. */
bool terse; /* OK to skip sending/receiving full flow
* info? */
int pmd_id; /* Datapath poll mode driver id. */
/* Output. */
struct dpif_flow_stats *stats; /* Optional flow statistics. */
@@ -706,6 +721,10 @@ struct dpif_execute {
* Callers should always provide 'key' to improve dpif logging in the event of
* errors or unexpected behaviour.
*
* If the datapath implements multiple polling thread with its own flow table,
* 'pmd_id' should be used to specify the particular polling thread for the
* operation.
*
* Succeeds with status 0 if the flow is fetched, or fails with ENOENT if no
* such flow exists. Other failures are indicated with a positive errno value.
*/
@@ -714,6 +733,7 @@ struct dpif_flow_get {
const struct nlattr *key; /* Flow to get. */
size_t key_len; /* Length of 'key' in bytes. */
const ovs_u128 *ufid; /* Unique identifier of flow to get. */
int pmd_id; /* Datapath poll mode driver id. */
struct ofpbuf *buffer; /* Storage for output parameters. */
/* Output. */
@@ -770,8 +790,9 @@ struct dpif_upcall {
/* A callback to process an upcall, currently implemented only by dpif-netdev.
*
* The caller provides the 'packet' and 'flow' to process, the corresponding
* 'ufid' as generated by dpif_flow_hash(), the 'type' of the upcall, and if
* 'type' is DPIF_UC_ACTION then the 'userdata' attached to the action.
* 'ufid' as generated by dpif_flow_hash(), the polling thread id 'pmd_id',
* the 'type' of the upcall, and if 'type' is DPIF_UC_ACTION then the
* 'userdata' attached to the action.
*
* The callback must fill in 'actions' with the datapath actions to apply to
* 'packet'. 'wc' and 'put_actions' will either be both null or both nonnull.
@@ -787,6 +808,7 @@ struct dpif_upcall {
typedef int upcall_callback(const struct ofpbuf *packet,
const struct flow *flow,
ovs_u128 *ufid,
int pmd_id,
enum dpif_upcall_type type,
const struct nlattr *userdata,
struct ofpbuf *actions,

View File

@@ -158,6 +158,7 @@ struct upcall {
* may be used with other datapaths. */
const struct flow *flow; /* Parsed representation of the packet. */
const ovs_u128 *ufid; /* Unique identifier for 'flow'. */
int pmd_id; /* Datapath poll mode driver id. */
const struct ofpbuf *packet; /* Packet associated with this upcall. */
ofp_port_t in_port; /* OpenFlow in port, or OFPP_NONE. */
@@ -211,6 +212,7 @@ struct udpif_key {
ovs_u128 ufid; /* Unique flow identifier. */
bool ufid_present; /* True if 'ufid' is in datapath. */
uint32_t hash; /* Pre-computed hash for 'key'. */
int pmd_id; /* Datapath poll mode driver id. */
struct ovs_mutex mutex; /* Guards the following. */
struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/
@@ -288,7 +290,7 @@ static enum upcall_type classify_upcall(enum dpif_upcall_type type,
static int upcall_receive(struct upcall *, const struct dpif_backer *,
const struct ofpbuf *packet, enum dpif_upcall_type,
const struct nlattr *userdata, const struct flow *,
const ovs_u128 *ufid);
const ovs_u128 *ufid, const int pmd_id);
static void upcall_uninit(struct upcall *);
static upcall_callback upcall_cb;
@@ -652,7 +654,7 @@ recv_upcalls(struct handler *handler)
error = upcall_receive(upcall, udpif->backer, &dupcall->packet,
dupcall->type, dupcall->userdata, flow,
&dupcall->ufid);
&dupcall->ufid, PMD_ID_NULL);
if (error) {
if (error == ENODEV) {
/* Received packet on datapath port for which we couldn't
@@ -661,7 +663,7 @@ recv_upcalls(struct handler *handler)
* message in case it happens frequently. */
dpif_flow_put(udpif->dpif, DPIF_FP_CREATE, dupcall->key,
dupcall->key_len, NULL, 0, NULL, 0,
&dupcall->ufid, NULL);
&dupcall->ufid, PMD_ID_NULL, NULL);
VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
"port %"PRIu32, flow->in_port.odp_port);
}
@@ -882,7 +884,7 @@ static int
upcall_receive(struct upcall *upcall, const struct dpif_backer *backer,
const struct ofpbuf *packet, enum dpif_upcall_type type,
const struct nlattr *userdata, const struct flow *flow,
const ovs_u128 *ufid)
const ovs_u128 *ufid, const int pmd_id)
{
int error;
@@ -895,6 +897,7 @@ upcall_receive(struct upcall *upcall, const struct dpif_backer *backer,
upcall->flow = flow;
upcall->packet = packet;
upcall->ufid = ufid;
upcall->pmd_id = pmd_id;
upcall->type = type;
upcall->userdata = userdata;
ofpbuf_init(&upcall->put_actions, 0);
@@ -996,9 +999,9 @@ upcall_uninit(struct upcall *upcall)
static int
upcall_cb(const struct ofpbuf *packet, const struct flow *flow, ovs_u128 *ufid,
enum dpif_upcall_type type, const struct nlattr *userdata,
struct ofpbuf *actions, struct flow_wildcards *wc,
struct ofpbuf *put_actions, void *aux)
int pmd_id, enum dpif_upcall_type type,
const struct nlattr *userdata, struct ofpbuf *actions,
struct flow_wildcards *wc, struct ofpbuf *put_actions, void *aux)
{
struct udpif *udpif = aux;
unsigned int flow_limit;
@@ -1010,7 +1013,7 @@ upcall_cb(const struct ofpbuf *packet, const struct flow *flow, ovs_u128 *ufid,
atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
error = upcall_receive(&upcall, udpif->backer, packet, type, userdata,
flow, ufid);
flow, ufid, pmd_id);
if (error) {
return error;
}
@@ -1259,7 +1262,7 @@ static struct udpif_key *
ukey_create__(const struct nlattr *key, size_t key_len,
const struct nlattr *mask, size_t mask_len,
bool ufid_present, const ovs_u128 *ufid,
const struct ofpbuf *actions,
const int pmd_id, const struct ofpbuf *actions,
uint64_t dump_seq, uint64_t reval_seq, long long int used)
OVS_NO_THREAD_SAFETY_ANALYSIS
{
@@ -1273,6 +1276,7 @@ ukey_create__(const struct nlattr *key, size_t key_len,
ukey->mask_len = mask_len;
ukey->ufid_present = ufid_present;
ukey->ufid = *ufid;
ukey->pmd_id = pmd_id;
ukey->hash = get_ufid_hash(&ukey->ufid);
ukey->actions = ofpbuf_clone(actions);
@@ -1318,8 +1322,9 @@ ukey_create_from_upcall(const struct upcall *upcall)
return ukey_create__(ofpbuf_data(&keybuf), ofpbuf_size(&keybuf),
ofpbuf_data(&maskbuf), ofpbuf_size(&maskbuf),
true, upcall->ufid, &upcall->put_actions,
upcall->dump_seq, upcall->reval_seq, 0);
true, upcall->ufid, upcall->pmd_id,
&upcall->put_actions, upcall->dump_seq,
upcall->reval_seq, 0);
}
static int
@@ -1338,8 +1343,8 @@ ukey_create_from_dpif_flow(const struct udpif *udpif,
/* If the key was not provided by the datapath, fetch the full flow. */
ofpbuf_use_stack(&buf, &stub, sizeof stub);
err = dpif_flow_get(udpif->dpif, NULL, 0, &flow->ufid, &buf,
&full_flow);
err = dpif_flow_get(udpif->dpif, NULL, 0, &flow->ufid,
flow->pmd_id, &buf, &full_flow);
if (err) {
return err;
}
@@ -1350,8 +1355,9 @@ ukey_create_from_dpif_flow(const struct udpif *udpif,
ofpbuf_use_const(&actions, &flow->actions, flow->actions_len);
*ukey = ukey_create__(flow->key, flow->key_len,
flow->mask, flow->mask_len, flow->ufid_present,
&flow->ufid, &actions, dump_seq, reval_seq,
flow->stats.used);
&flow->ufid, flow->pmd_id, &actions, dump_seq,
reval_seq, flow->stats.used);
return 0;
}
@@ -1683,6 +1689,7 @@ delete_op_init__(struct udpif *udpif, struct ukey_op *op,
op->dop.u.flow_del.key = flow->key;
op->dop.u.flow_del.key_len = flow->key_len;
op->dop.u.flow_del.ufid = flow->ufid_present ? &flow->ufid : NULL;
op->dop.u.flow_del.pmd_id = flow->pmd_id;
op->dop.u.flow_del.stats = &op->stats;
atomic_read_relaxed(&udpif->enable_ufid, &op->dop.u.flow_del.terse);
}
@@ -1695,6 +1702,7 @@ delete_op_init(struct udpif *udpif, struct ukey_op *op, struct udpif_key *ukey)
op->dop.u.flow_del.key = ukey->key;
op->dop.u.flow_del.key_len = ukey->key_len;
op->dop.u.flow_del.ufid = ukey->ufid_present ? &ukey->ufid : NULL;
op->dop.u.flow_del.pmd_id = ukey->pmd_id;
op->dop.u.flow_del.stats = &op->stats;
atomic_read_relaxed(&udpif->enable_ufid, &op->dop.u.flow_del.terse);
}

View File

@@ -3645,6 +3645,7 @@ for type in no first later; do
done
AT_CHECK([ovs-appctl dpctl/dump-flows], [0], [dnl
flow-dump from non-dpdk interfaces:
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),1
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),5
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:0, bytes:0, used:never, actions:6
@@ -3662,6 +3663,7 @@ for type in no first later; do
done
AT_CHECK([ovs-appctl dpctl/dump-flows], [0], [dnl
flow-dump from non-dpdk interfaces:
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),1
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(frag=first), packets:0, bytes:0, used:never, actions:drop
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(frag=later), packets:0, bytes:0, used:never, actions:drop
@@ -3679,6 +3681,7 @@ for type in no first later; do
done
AT_CHECK([ovs-appctl dpctl/dump-flows], [0], [dnl
flow-dump from non-dpdk interfaces:
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),1
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),2
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:0, bytes:0, used:never, actions:6
@@ -3750,6 +3753,7 @@ for frag in 4000 6000 6008 4010; do
done
AT_CHECK([ovs-appctl dpctl/dump-flows | sed 's/used:[[0-9]].[[0-9]]*s/used:0.001s/'], [0], [dnl
flow-dump from non-dpdk interfaces:
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(src=33419), packets:0, bytes:0, used:never, actions:set(tcp(src=33322)),1
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(src=33419), packets:0, bytes:0, used:never, actions:set(tcp(src=33322)),1
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:1, bytes:74, used:0.001s, actions:1
@@ -3764,6 +3768,7 @@ for frag in 4000 6000 6008 4010; do
done
AT_CHECK([ovs-appctl dpctl/dump-flows | sed 's/used:[[0-9]].[[0-9]]*s/used:0.001s/'], [0], [dnl
flow-dump from non-dpdk interfaces:
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(src=0), packets:0, bytes:0, used:never, actions:set(tcp(src=42)),1
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(src=0), packets:0, bytes:0, used:never, actions:set(tcp(src=42)),1
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:1, bytes:60, used:0.001s, actions:1
@@ -3778,6 +3783,7 @@ for frag in 4000 6000 6001 4002; do
done
AT_CHECK([ovs-appctl dpctl/dump-flows | sed 's/used:[[0-9]].[[0-9]]*s/used:0.001s/'], [0], [dnl
flow-dump from non-dpdk interfaces:
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(src=0), packets:0, bytes:0, used:never, actions:set(tcp(src=42)),1
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(src=0), packets:0, bytes:0, used:never, actions:set(tcp(src=42)),1
recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:1, bytes:60, used:0.001s, actions:1