2
0
mirror of https://github.com/openvswitch/ovs synced 2025-08-31 06:15:47 +00:00

dpif-netdev: Streamline miss handling.

This patch avoids the relatively inefficient miss handling processes
dictated by the dpif process, by calling into ofproto-dpif directly
through a callback.

Signed-off-by: Ethan Jackson <ethan@nicira.com>
Acked-by: Ben Pfaff <blp@nicira.com>
This commit is contained in:
Ethan Jackson
2014-07-26 15:39:58 -07:00
parent cc377352d1
commit 623540e461
7 changed files with 338 additions and 304 deletions

View File

@@ -85,14 +85,7 @@ static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;
static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex)
= SHASH_INITIALIZER(&dp_netdevs);
struct dp_netdev_queue {
unsigned int packet_count;
struct dpif_upcall upcalls[NETDEV_MAX_RX_BATCH];
struct ofpbuf bufs[NETDEV_MAX_RX_BATCH];
};
#define DP_NETDEV_QUEUE_INITIALIZER { .packet_count = 0 }
static struct vlog_rate_limit upcall_rl = VLOG_RATE_LIMIT_INIT(600, 600);
/* Datapath based on the network device interface from netdev.h.
*
@@ -140,7 +133,8 @@ struct dp_netdev {
/* Protects access to ofproto-dpif-upcall interface during revalidator
* thread synchronization. */
struct fat_rwlock upcall_rwlock;
exec_upcall_cb *upcall_cb; /* Callback function for executing upcalls. */
upcall_callback *upcall_cb; /* Callback function for executing upcalls. */
void *upcall_aux;
/* Forwarding threads. */
struct latch exit_latch;
@@ -324,12 +318,6 @@ static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *)
OVS_REQUIRES(dp->port_mutex);
static int dpif_netdev_open(const struct dpif_class *, const char *name,
bool create, struct dpif **);
static int dp_netdev_queue_userspace_packet(struct dp_netdev_queue *,
struct ofpbuf *, int type,
const struct miniflow *,
const struct nlattr *);
static void dp_netdev_execute_userspace_queue(struct dp_netdev_queue *,
struct dp_netdev *);
static void dp_netdev_execute_actions(struct dp_netdev *dp,
struct dpif_packet **, int c,
bool may_steal, struct pkt_metadata *,
@@ -478,6 +466,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
/* Disable upcalls by default. */
dp_netdev_disable_upcall(dp);
dp->upcall_aux = NULL;
dp->upcall_cb = NULL;
ovs_mutex_lock(&dp->port_mutex);
@@ -1241,6 +1230,19 @@ dp_netdev_flow_add(struct dp_netdev *dp, struct match *match,
classifier_insert(&dp->cls,
CONST_CAST(struct cls_rule *, &netdev_flow->cr));
if (OVS_UNLIKELY(VLOG_IS_DBG_ENABLED())) {
struct ds ds = DS_EMPTY_INITIALIZER;
ds_put_cstr(&ds, "flow_add: ");
match_format(match, &ds, OFP_DEFAULT_PRIORITY);
ds_put_cstr(&ds, ", actions:");
format_odp_actions(&ds, actions, actions_len);
VLOG_DBG_RL(&upcall_rl, "%s", ds_cstr(&ds));
ds_destroy(&ds);
}
return 0;
}
@@ -1859,6 +1861,48 @@ dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type, int cnt)
ovs_mutex_unlock(&bucket->mutex);
}
static int
dp_netdev_upcall(struct dp_netdev *dp, struct dpif_packet *packet_,
struct flow *flow, struct flow_wildcards *wc,
enum dpif_upcall_type type, const struct nlattr *userdata,
struct ofpbuf *actions, struct ofpbuf *put_actions)
{
struct ofpbuf *packet = &packet_->ofpbuf;
if (type == DPIF_UC_MISS) {
dp_netdev_count_packet(dp, DP_STAT_MISS, 1);
}
if (OVS_UNLIKELY(!dp->upcall_cb)) {
return ENODEV;
}
if (OVS_UNLIKELY(!VLOG_DROP_DBG(&upcall_rl))) {
struct ds ds = DS_EMPTY_INITIALIZER;
struct ofpbuf key;
char *packet_str;
ofpbuf_init(&key, 0);
odp_flow_key_from_flow(&key, flow, &wc->masks, flow->in_port.odp_port,
true);
packet_str = ofp_packet_to_string(ofpbuf_data(packet),
ofpbuf_size(packet));
odp_flow_key_format(ofpbuf_data(&key), ofpbuf_size(&key), &ds);
VLOG_DBG("%s: %s upcall:\n%s\n%s", dp->name,
dpif_upcall_type_to_string(type), ds_cstr(&ds), packet_str);
ofpbuf_uninit(&key);
free(packet_str);
ds_destroy(&ds);
}
return dp->upcall_cb(packet, flow, type, userdata, actions, wc,
put_actions, dp->upcall_aux);
}
struct packet_batch {
unsigned int packet_count;
unsigned int byte_count;
@@ -1913,12 +1957,12 @@ static void
dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
struct pkt_metadata *md)
{
struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER;
struct packet_batch batches[NETDEV_MAX_RX_BATCH];
struct netdev_flow_key keys[NETDEV_MAX_RX_BATCH];
const struct miniflow *mfs[NETDEV_MAX_RX_BATCH]; /* NULL at bad packets. */
struct cls_rule *rules[NETDEV_MAX_RX_BATCH];
size_t n_batches, i;
bool any_miss;
for (i = 0; i < cnt; i++) {
if (OVS_UNLIKELY(ofpbuf_size(&packets[i]->ofpbuf) < ETH_HEADER_LEN)) {
@@ -1932,7 +1976,75 @@ dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
mfs[i] = &keys[i].flow;
}
classifier_lookup_miniflow_batch(&dp->cls, mfs, rules, cnt);
any_miss = !classifier_lookup_miniflow_batch(&dp->cls, mfs, rules, cnt);
if (OVS_UNLIKELY(any_miss) && !fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
uint64_t actions_stub[512 / 8], slow_stub[512 / 8];
struct ofpbuf actions, put_actions;
struct match match;
ofpbuf_use_stub(&actions, actions_stub, sizeof actions_stub);
ofpbuf_use_stub(&put_actions, slow_stub, sizeof slow_stub);
for (i = 0; i < cnt; i++) {
const struct dp_netdev_flow *netdev_flow;
struct ofpbuf *add_actions;
int error;
if (OVS_LIKELY(rules[i] || !mfs[i])) {
continue;
}
/* It's possible that an earlier slow path execution installed
* the rule this flow needs. In this case, it's a lot cheaper
* to catch it here than execute a miss. */
netdev_flow = dp_netdev_lookup_flow(dp, mfs[i]);
if (netdev_flow) {
rules[i] = CONST_CAST(struct cls_rule *, &netdev_flow->cr);
continue;
}
miniflow_expand(mfs[i], &match.flow);
ofpbuf_clear(&actions);
ofpbuf_clear(&put_actions);
error = dp_netdev_upcall(dp, packets[i], &match.flow, &match.wc,
DPIF_UC_MISS, NULL, &actions,
&put_actions);
if (OVS_UNLIKELY(error && error != ENOSPC)) {
continue;
}
/* We can't allow the packet batching in the next loop to execute
* the actions. Otherwise, if there are any slow path actions,
* we'll send the packet up twice. */
dp_netdev_execute_actions(dp, &packets[i], 1, false, md,
ofpbuf_data(&actions),
ofpbuf_size(&actions));
add_actions = ofpbuf_size(&put_actions)
? &put_actions
: &actions;
ovs_mutex_lock(&dp->flow_mutex);
/* XXX: There's a brief race where this flow could have already
* been installed since we last did the flow lookup. This could be
* solved by moving the mutex lock outside the loop, but that's an
* awful long time to be locking everyone out of making flow
* installs. If we move to a per-core classifier, it would be
* reasonable. */
if (OVS_LIKELY(error != ENOSPC)
&& !dp_netdev_lookup_flow(dp, mfs[i])) {
dp_netdev_flow_add(dp, &match, ofpbuf_data(add_actions),
ofpbuf_size(add_actions));
}
ovs_mutex_unlock(&dp->flow_mutex);
}
ofpbuf_uninit(&actions);
ofpbuf_uninit(&put_actions);
fat_rwlock_unlock(&dp->upcall_rwlock);
}
n_batches = 0;
for (i = 0; i < cnt; i++) {
@@ -1940,17 +2052,7 @@ dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
struct packet_batch *batch;
size_t j;
if (OVS_UNLIKELY(!mfs[i])) {
continue;
}
if (OVS_UNLIKELY(!rules[i])) {
struct ofpbuf *buf = &packets[i]->ofpbuf;
dp_netdev_count_packet(dp, DP_STAT_MISS, 1);
dp_netdev_queue_userspace_packet(&q, buf, DPIF_UC_MISS,
mfs[i], NULL);
dpif_packet_delete(packets[i]);
if (OVS_UNLIKELY(!rules[i] || !mfs[i])) {
continue;
}
@@ -1979,10 +2081,6 @@ dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
for (i = 0; i < n_batches; i++) {
packet_batch_execute(&batches[i], dp);
}
if (q.packet_count) {
dp_netdev_execute_userspace_queue(&q, dp);
}
}
static void
@@ -1996,86 +2094,16 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dpif_packet **packets,
dp_netdev_input(dp, packets, cnt, &md);
}
static int
dp_netdev_queue_userspace_packet(struct dp_netdev_queue *q,
struct ofpbuf *packet, int type,
const struct miniflow *key,
const struct nlattr *userdata)
{
if (q->packet_count < NETDEV_MAX_RX_BATCH) {
int cnt = q->packet_count;
struct dpif_upcall *upcall = &q->upcalls[cnt];
struct ofpbuf *buf = &q->bufs[cnt];
size_t buf_size;
struct flow flow;
void *data;
upcall->type = type;
/* Allocate buffer big enough for everything. */
buf_size = ODPUTIL_FLOW_KEY_BYTES;
if (userdata) {
buf_size += NLA_ALIGN(userdata->nla_len);
}
buf_size += ofpbuf_size(packet);
ofpbuf_init(buf, buf_size);
/* Put ODP flow. */
miniflow_expand(key, &flow);
odp_flow_key_from_flow(buf, &flow, NULL, flow.in_port.odp_port, true);
upcall->key = ofpbuf_data(buf);
upcall->key_len = ofpbuf_size(buf);
/* Put userdata. */
if (userdata) {
upcall->userdata = ofpbuf_put(buf, userdata,
NLA_ALIGN(userdata->nla_len));
}
/* We have to perform a copy of the packet, because we cannot send DPDK
* mbufs to a non pmd thread. When the upcall processing will be done
* in the pmd thread, this copy can be avoided */
data = ofpbuf_put(buf, ofpbuf_data(packet), ofpbuf_size(packet));
ofpbuf_use_stub(&upcall->packet, data, ofpbuf_size(packet));
ofpbuf_set_size(&upcall->packet, ofpbuf_size(packet));
q->packet_count++;
return 0;
} else {
return ENOBUFS;
}
}
static void
dp_netdev_execute_userspace_queue(struct dp_netdev_queue *q,
struct dp_netdev *dp)
{
struct dpif_upcall *upcalls = q->upcalls;
struct ofpbuf *bufs = q->bufs;
int cnt = q->packet_count;
if (!fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
ovs_assert(dp->upcall_cb);
dp->upcall_cb(dp->dpif, upcalls, bufs, cnt);
fat_rwlock_unlock(&dp->upcall_rwlock);
} else {
int i;
for (i = 0; i < cnt; i++) {
ofpbuf_uninit(&bufs[i]);
ofpbuf_uninit(&upcalls[i].packet);
}
}
}
struct dp_netdev_execute_aux {
struct dp_netdev *dp;
};
static void
dpif_netdev_register_upcall_cb(struct dpif *dpif, exec_upcall_cb *cb)
dpif_netdev_register_upcall_cb(struct dpif *dpif, upcall_callback *cb,
void *aux)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
dp->upcall_aux = aux;
dp->upcall_cb = cb;
}
@@ -2086,14 +2114,15 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
OVS_NO_THREAD_SAFETY_ANALYSIS
{
struct dp_netdev_execute_aux *aux = aux_;
uint32_t *depth = recirc_depth_get();
struct dp_netdev *dp = aux->dp;
int type = nl_attr_type(a);
struct dp_netdev_port *p;
uint32_t *depth = recirc_depth_get();
int i;
switch ((enum ovs_action_attr)type) {
case OVS_ACTION_ATTR_OUTPUT:
p = dp_netdev_lookup_port(aux->dp, u32_to_odp(nl_attr_get_u32(a)));
p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
if (OVS_LIKELY(p)) {
netdev_send(p->netdev, packets, cnt, may_steal);
} else if (may_steal) {
@@ -2103,35 +2132,39 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
}
break;
case OVS_ACTION_ATTR_USERSPACE: {
const struct nlattr *userdata;
struct netdev_flow_key key;
struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER;
case OVS_ACTION_ATTR_USERSPACE:
if (!fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
const struct nlattr *userdata;
struct ofpbuf actions;
struct flow flow;
userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
ofpbuf_init(&actions, 0);
miniflow_initialize(&key.flow, key.buf);
for (i = 0; i < cnt; i++) {
int error;
for (i = 0; i < cnt; i++) {
struct ofpbuf *packet;
ofpbuf_clear(&actions);
packet = &packets[i]->ofpbuf;
flow_extract(&packets[i]->ofpbuf, md, &flow);
error = dp_netdev_upcall(dp, packets[i], &flow, NULL,
DPIF_UC_ACTION, userdata, &actions,
NULL);
if (!error || error == ENOSPC) {
dp_netdev_execute_actions(dp, &packets[i], 1, false, md,
ofpbuf_data(&actions),
ofpbuf_size(&actions));
}
miniflow_extract(packet, md, &key.flow);
dp_netdev_queue_userspace_packet(&q, packet,
DPIF_UC_ACTION, &key.flow,
userdata);
if (may_steal) {
dpif_packet_delete(packets[i]);
if (may_steal) {
dpif_packet_delete(packets[i]);
}
}
ofpbuf_uninit(&actions);
fat_rwlock_unlock(&dp->upcall_rwlock);
}
if (q.packet_count) {
dp_netdev_execute_userspace_queue(&q, aux->dp);
}
break;
}
case OVS_ACTION_ATTR_HASH: {
const struct ovs_action_hash *hash_act;
@@ -2185,7 +2218,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
/* Hash is private to each packet */
recirc_md.dp_hash = packets[i]->dp_hash;
dp_netdev_input(aux->dp, &recirc_pkt, 1, &recirc_md);
dp_netdev_input(dp, &recirc_pkt, 1, &recirc_md);
}
(*depth)--;