diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c index 447573186..b98413da4 100644 --- a/lib/dpif-linux.c +++ b/lib/dpif-linux.c @@ -1934,6 +1934,9 @@ const struct dpif_class dpif_linux_class = { dpif_linux_recv, dpif_linux_recv_wait, dpif_linux_recv_purge, + NULL, /* register_upcall_cb */ + NULL, /* enable_upcall */ + NULL, /* disable_upcall */ }; static int diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 8422c8975..20813a8fd 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -78,11 +78,6 @@ DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0) /* Configuration parameters. */ enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow table. */ -/* Queues. */ -enum { MAX_QUEUE_LEN = 128 }; /* Maximum number of packets per queue. */ -enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 }; -BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN)); - /* Protects against changes to 'dp_netdevs'. */ static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER; @@ -90,26 +85,14 @@ static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER; static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex) = SHASH_INITIALIZER(&dp_netdevs); -struct dp_netdev_upcall { - struct dpif_upcall upcall; /* Queued upcall information. */ - struct ofpbuf buf; /* ofpbuf instance for upcall.packet. */ +struct dp_netdev_queue { + unsigned int packet_count; + + struct dpif_upcall upcalls[NETDEV_MAX_RX_BATCH]; + struct ofpbuf bufs[NETDEV_MAX_RX_BATCH]; }; -/* A queue passing packets from a struct dp_netdev to its clients (handlers). - * - * - * Thread-safety - * ============= - * - * Any access at all requires the owning 'dp_netdev''s queue_rwlock and - * its own mutex. */ -struct dp_netdev_queue { - struct ovs_mutex mutex; - struct seq *seq; /* Incremented whenever a packet is queued. */ - struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED; - unsigned int head OVS_GUARDED; - unsigned int tail OVS_GUARDED; -}; +#define DP_NETDEV_QUEUE_INITIALIZER { .packet_count = 0 } /* Datapath based on the network device interface from netdev.h. * @@ -125,11 +108,11 @@ struct dp_netdev_queue { * dp_netdev_mutex (global) * port_mutex * flow_mutex - * queue_rwlock */ struct dp_netdev { const struct dpif_class *const class; const char *const name; + struct dpif *dpif; struct ovs_refcount ref_cnt; atomic_flag destroyed; @@ -142,15 +125,6 @@ struct dp_netdev { struct classifier cls; struct cmap flow_table OVS_GUARDED; /* Flow table. */ - /* Queues. - * - * 'queue_rwlock' protects the modification of 'handler_queues' and - * 'n_handlers'. The queue elements are protected by its - * 'handler_queues''s mutex. */ - struct fat_rwlock queue_rwlock; - struct dp_netdev_queue *handler_queues; - uint32_t n_handlers; - /* Statistics. * * ovsthread_stats is internally synchronized. */ @@ -163,6 +137,11 @@ struct dp_netdev { struct cmap ports; struct seq *port_seq; /* Incremented whenever a port changes. */ + /* Protects access to ofproto-dpif-upcall interface during revalidator + * thread synchronization. */ + struct fat_rwlock upcall_rwlock; + exec_upcall_cb *upcall_cb; /* Callback function for executing upcalls. */ + /* Forwarding threads. */ struct latch exit_latch; struct pmd_thread *pmd_threads; @@ -339,14 +318,14 @@ static int do_add_port(struct dp_netdev *dp, const char *devname, OVS_REQUIRES(dp->port_mutex); static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *) OVS_REQUIRES(dp->port_mutex); -static void dp_netdev_destroy_all_queues(struct dp_netdev *dp) - OVS_REQ_WRLOCK(dp->queue_rwlock); static int dpif_netdev_open(const struct dpif_class *, const char *name, bool create, struct dpif **); -static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *, - int queue_no, int type, - const struct miniflow *, - const struct nlattr *userdata); +static int dp_netdev_queue_userspace_packet(struct dp_netdev_queue *, + struct ofpbuf *, int type, + const struct miniflow *, + const struct nlattr *); +static void dp_netdev_execute_userspace_queue(struct dp_netdev_queue *, + struct dp_netdev *); static void dp_netdev_execute_actions(struct dp_netdev *dp, struct dpif_packet **, int c, bool may_steal, struct pkt_metadata *, @@ -357,6 +336,7 @@ static void dp_netdev_port_input(struct dp_netdev *dp, odp_port_t port_no); static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n); +static void dp_netdev_disable_upcall(struct dp_netdev *); static struct dpif_netdev * dpif_netdev_cast(const struct dpif *dpif) @@ -484,14 +464,17 @@ create_dp_netdev(const char *name, const struct dpif_class *class, classifier_init(&dp->cls, NULL); cmap_init(&dp->flow_table); - fat_rwlock_init(&dp->queue_rwlock); - ovsthread_stats_init(&dp->stats); ovs_mutex_init(&dp->port_mutex); cmap_init(&dp->ports); dp->port_seq = seq_create(); latch_init(&dp->exit_latch); + fat_rwlock_init(&dp->upcall_rwlock); + + /* Disable upcalls by default. */ + dp_netdev_disable_upcall(dp); + dp->upcall_cb = NULL; ovs_mutex_lock(&dp->port_mutex); error = do_add_port(dp, name, "internal", ODPP_LOCAL); @@ -523,31 +506,13 @@ dpif_netdev_open(const struct dpif_class *class, const char *name, } if (!error) { *dpifp = create_dpif_netdev(dp); + dp->dpif = *dpifp; } ovs_mutex_unlock(&dp_netdev_mutex); return error; } -static void -dp_netdev_purge_queues(struct dp_netdev *dp) - OVS_REQ_WRLOCK(dp->queue_rwlock) -{ - int i; - - for (i = 0; i < dp->n_handlers; i++) { - struct dp_netdev_queue *q = &dp->handler_queues[i]; - - ovs_mutex_lock(&q->mutex); - while (q->tail != q->head) { - struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK]; - ofpbuf_uninit(&u->upcall.packet); - ofpbuf_uninit(&u->buf); - } - ovs_mutex_unlock(&q->mutex); - } -} - /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp' * through the 'dp_netdevs' shash while freeing 'dp'. */ static void @@ -576,17 +541,12 @@ dp_netdev_free(struct dp_netdev *dp) } ovsthread_stats_destroy(&dp->stats); - fat_rwlock_wrlock(&dp->queue_rwlock); - dp_netdev_destroy_all_queues(dp); - fat_rwlock_unlock(&dp->queue_rwlock); - - fat_rwlock_destroy(&dp->queue_rwlock); - classifier_destroy(&dp->cls); cmap_destroy(&dp->flow_table); ovs_mutex_destroy(&dp->flow_mutex); seq_destroy(dp->port_seq); cmap_destroy(&dp->ports); + fat_rwlock_destroy(&dp->upcall_rwlock); latch_destroy(&dp->exit_latch); free(CONST_CAST(char *, dp->name)); free(dp); @@ -1559,80 +1519,6 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute) return 0; } -static void -dp_netdev_destroy_all_queues(struct dp_netdev *dp) - OVS_REQ_WRLOCK(dp->queue_rwlock) -{ - size_t i; - - dp_netdev_purge_queues(dp); - - for (i = 0; i < dp->n_handlers; i++) { - struct dp_netdev_queue *q = &dp->handler_queues[i]; - - ovs_mutex_destroy(&q->mutex); - seq_destroy(q->seq); - } - free(dp->handler_queues); - dp->handler_queues = NULL; - dp->n_handlers = 0; -} - -static void -dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers) - OVS_REQ_WRLOCK(dp->queue_rwlock) -{ - if (dp->n_handlers != n_handlers) { - size_t i; - - dp_netdev_destroy_all_queues(dp); - - dp->n_handlers = n_handlers; - dp->handler_queues = xzalloc(n_handlers * sizeof *dp->handler_queues); - - for (i = 0; i < n_handlers; i++) { - struct dp_netdev_queue *q = &dp->handler_queues[i]; - - ovs_mutex_init(&q->mutex); - q->seq = seq_create(); - } - } -} - -static int -dpif_netdev_recv_set(struct dpif *dpif, bool enable) -{ - struct dp_netdev *dp = get_dp_netdev(dpif); - - if ((dp->handler_queues != NULL) == enable) { - return 0; - } - - fat_rwlock_wrlock(&dp->queue_rwlock); - if (!enable) { - dp_netdev_destroy_all_queues(dp); - } else { - dp_netdev_refresh_queues(dp, 1); - } - fat_rwlock_unlock(&dp->queue_rwlock); - - return 0; -} - -static int -dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers) -{ - struct dp_netdev *dp = get_dp_netdev(dpif); - - fat_rwlock_wrlock(&dp->queue_rwlock); - if (dp->handler_queues) { - dp_netdev_refresh_queues(dp, n_handlers); - } - fat_rwlock_unlock(&dp->queue_rwlock); - - return 0; -} - static int dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED, uint32_t queue_id, uint32_t *priority) @@ -1641,97 +1527,6 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED, return 0; } -static bool -dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t handler_id) - OVS_REQ_RDLOCK(dp->queue_rwlock) -{ - static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); - - if (!dp->handler_queues) { - VLOG_WARN_RL(&rl, "receiving upcall disabled"); - return false; - } - - if (handler_id >= dp->n_handlers) { - VLOG_WARN_RL(&rl, "handler index out of bound"); - return false; - } - - return true; -} - -static int -dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id, - struct dpif_upcall *upcall, struct ofpbuf *buf) -{ - struct dp_netdev *dp = get_dp_netdev(dpif); - struct dp_netdev_queue *q; - int error = 0; - - fat_rwlock_rdlock(&dp->queue_rwlock); - - if (!dp_netdev_recv_check(dp, handler_id)) { - error = EAGAIN; - goto out; - } - - q = &dp->handler_queues[handler_id]; - ovs_mutex_lock(&q->mutex); - if (q->head != q->tail) { - struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK]; - - *upcall = u->upcall; - - ofpbuf_uninit(buf); - *buf = u->buf; - } else { - error = EAGAIN; - } - ovs_mutex_unlock(&q->mutex); - -out: - fat_rwlock_unlock(&dp->queue_rwlock); - - return error; -} - -static void -dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id) -{ - struct dp_netdev *dp = get_dp_netdev(dpif); - struct dp_netdev_queue *q; - uint64_t seq; - - fat_rwlock_rdlock(&dp->queue_rwlock); - - if (!dp_netdev_recv_check(dp, handler_id)) { - goto out; - } - - q = &dp->handler_queues[handler_id]; - ovs_mutex_lock(&q->mutex); - seq = seq_read(q->seq); - if (q->head != q->tail) { - poll_immediate_wake(); - } else { - seq_wait(q->seq, seq); - } - - ovs_mutex_unlock(&q->mutex); - -out: - fat_rwlock_unlock(&dp->queue_rwlock); -} - -static void -dpif_netdev_recv_purge(struct dpif *dpif) -{ - struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif); - - fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock); - dp_netdev_purge_queues(dpif_netdev->dp); - fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock); -} /* Creates and returns a new 'struct dp_netdev_actions', with a reference count * of 1, whose actions are a copy of from the 'ofpacts_len' bytes of @@ -1918,6 +1713,36 @@ reload: return NULL; } +static void +dp_netdev_disable_upcall(struct dp_netdev *dp) + OVS_ACQUIRES(dp->upcall_rwlock) +{ + fat_rwlock_wrlock(&dp->upcall_rwlock); +} + +static void +dpif_netdev_disable_upcall(struct dpif *dpif) + OVS_NO_THREAD_SAFETY_ANALYSIS +{ + struct dp_netdev *dp = get_dp_netdev(dpif); + dp_netdev_disable_upcall(dp); +} + +static void +dp_netdev_enable_upcall(struct dp_netdev *dp) + OVS_RELEASES(dp->upcall_rwlock) +{ + fat_rwlock_unlock(&dp->upcall_rwlock); +} + +static void +dpif_netdev_enable_upcall(struct dpif *dpif) + OVS_NO_THREAD_SAFETY_ANALYSIS +{ + struct dp_netdev *dp = get_dp_netdev(dpif); + dp_netdev_enable_upcall(dp); +} + static void dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n) { @@ -2056,6 +1881,7 @@ static void dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt, struct pkt_metadata *md) { + struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER; struct packet_batch batches[NETDEV_MAX_RX_BATCH]; struct netdev_flow_key keys[NETDEV_MAX_RX_BATCH]; const struct miniflow *mfs[NETDEV_MAX_RX_BATCH]; /* NULL at bad packets. */ @@ -2087,17 +1913,11 @@ dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt, } if (OVS_UNLIKELY(!rules[i])) { + struct ofpbuf *buf = &packets[i]->ofpbuf; dp_netdev_count_packet(dp, DP_STAT_MISS, 1); - - if (OVS_LIKELY(dp->handler_queues)) { - uint32_t hash = miniflow_hash_5tuple(mfs[i], 0); - struct ofpbuf *buf = &packets[i]->ofpbuf; - - dp_netdev_output_userspace(dp, buf, hash % dp->n_handlers, - DPIF_UC_MISS, mfs[i], NULL); - } - + dp_netdev_queue_userspace_packet(&q, buf, DPIF_UC_MISS, + mfs[i], NULL); dpif_packet_delete(packets[i]); continue; } @@ -2127,6 +1947,10 @@ dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt, for (i = 0; i < n_batches; i++) { packet_batch_execute(&batches[i], dp); } + + if (q.packet_count) { + dp_netdev_execute_userspace_queue(&q, dp); + } } static void @@ -2145,12 +1969,11 @@ dp_netdev_queue_userspace_packet(struct dp_netdev_queue *q, struct ofpbuf *packet, int type, const struct miniflow *key, const struct nlattr *userdata) -OVS_REQUIRES(q->mutex) { - if (q->head - q->tail < MAX_QUEUE_LEN) { - struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK]; - struct dpif_upcall *upcall = &u->upcall; - struct ofpbuf *buf = &u->buf; + if (q->packet_count < NETDEV_MAX_RX_BATCH) { + int cnt = q->packet_count; + struct dpif_upcall *upcall = &q->upcalls[cnt]; + struct ofpbuf *buf = &q->bufs[cnt]; size_t buf_size; struct flow flow; void *data; @@ -2174,7 +1997,7 @@ OVS_REQUIRES(q->mutex) /* Put userdata. */ if (userdata) { upcall->userdata = ofpbuf_put(buf, userdata, - NLA_ALIGN(userdata->nla_len)); + NLA_ALIGN(userdata->nla_len)); } /* We have to perform a copy of the packet, because we cannot send DPDK @@ -2184,41 +2007,46 @@ OVS_REQUIRES(q->mutex) ofpbuf_use_stub(&upcall->packet, data, ofpbuf_size(packet)); ofpbuf_set_size(&upcall->packet, ofpbuf_size(packet)); - seq_change(q->seq); - + q->packet_count++; return 0; } else { return ENOBUFS; } } -static int -dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet, - int queue_no, int type, - const struct miniflow *key, - const struct nlattr *userdata) +static void +dp_netdev_execute_userspace_queue(struct dp_netdev_queue *q, + struct dp_netdev *dp) { - struct dp_netdev_queue *q; - int error; + struct dpif_upcall *upcalls = q->upcalls; + struct ofpbuf *bufs = q->bufs; + int cnt = q->packet_count; - fat_rwlock_rdlock(&dp->queue_rwlock); - q = &dp->handler_queues[queue_no]; - ovs_mutex_lock(&q->mutex); - error = dp_netdev_queue_userspace_packet(q, packet, type, key, - userdata); - if (error == ENOBUFS) { - dp_netdev_count_packet(dp, DP_STAT_LOST, 1); + if (!fat_rwlock_tryrdlock(&dp->upcall_rwlock)) { + ovs_assert(dp->upcall_cb); + dp->upcall_cb(dp->dpif, upcalls, bufs, cnt); + fat_rwlock_unlock(&dp->upcall_rwlock); + } else { + int i; + + for (i = 0; i < cnt; i++) { + ofpbuf_uninit(&bufs[i]); + ofpbuf_uninit(&upcalls[i].packet); + } } - ovs_mutex_unlock(&q->mutex); - fat_rwlock_unlock(&dp->queue_rwlock); - - return error; } struct dp_netdev_execute_aux { struct dp_netdev *dp; }; +static void +dpif_netdev_register_upcall_cb(struct dpif *dpif, exec_upcall_cb *cb) +{ + struct dp_netdev *dp = get_dp_netdev(dpif); + dp->upcall_cb = cb; +} + static void dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt, struct pkt_metadata *md, @@ -2246,6 +2074,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt, case OVS_ACTION_ATTR_USERSPACE: { const struct nlattr *userdata; struct netdev_flow_key key; + struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER; userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA); @@ -2258,15 +2087,17 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt, miniflow_extract(packet, md, &key.flow); - dp_netdev_output_userspace(aux->dp, packet, - miniflow_hash_5tuple(&key.flow, 0) - % aux->dp->n_handlers, - DPIF_UC_ACTION, &key.flow, - userdata); + dp_netdev_queue_userspace_packet(&q, packet, + DPIF_UC_ACTION, &key.flow, + userdata); if (may_steal) { dpif_packet_delete(packets[i]); } } + + if (q.packet_count) { + dp_netdev_execute_userspace_queue(&q, aux->dp); + } break; } @@ -2392,12 +2223,15 @@ const struct dpif_class dpif_netdev_class = { dpif_netdev_flow_dump_next, dpif_netdev_execute, NULL, /* operate */ - dpif_netdev_recv_set, - dpif_netdev_handlers_set, + NULL, /* recv_set */ + NULL, /* handlers_set */ dpif_netdev_queue_to_priority, - dpif_netdev_recv, - dpif_netdev_recv_wait, - dpif_netdev_recv_purge, + NULL, /* recv */ + NULL, /* recv_wait */ + NULL, /* recv_purge */ + dpif_netdev_register_upcall_cb, + dpif_netdev_enable_upcall, + dpif_netdev_disable_upcall, }; static void diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h index 0f42d7a94..410fcfa15 100644 --- a/lib/dpif-netdev.h +++ b/lib/dpif-netdev.h @@ -20,6 +20,7 @@ #include #include #include +#include "dpif.h" #include "openvswitch/types.h" #include "ofpbuf.h" #include "packets.h" diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h index 6a06cf8dd..bf24a9d04 100644 --- a/lib/dpif-provider.h +++ b/lib/dpif-provider.h @@ -411,6 +411,22 @@ struct dpif_class { /* Throws away any queued upcalls that 'dpif' currently has ready to * return. */ void (*recv_purge)(struct dpif *dpif); + + /* For datapaths that run in userspace (i.e. dpif-netdev), threads polling + * for incoming packets can directly call upcall functions instead of + * offloading packet processing to separate handler threads. Datapaths + * that directly call upcall functions should use the functions below to + * to register an upcall function and enable / disable upcalls. + * + * Registers an upcall callback function with 'dpif'. This is only used if + * if 'dpif' directly executes upcall functions. */ + void (*register_upcall_cb)(struct dpif *, exec_upcall_cb *); + + /* Enables upcalls if 'dpif' directly executes upcall functions. */ + void (*enable_upcall)(struct dpif *); + + /* Disables upcalls if 'dpif' directly executes upcall functions. */ + void (*disable_upcall)(struct dpif *); }; extern const struct dpif_class dpif_linux_class; diff --git a/lib/dpif.c b/lib/dpif.c index a3258057c..350156947 100644 --- a/lib/dpif.c +++ b/lib/dpif.c @@ -1305,8 +1305,12 @@ dpif_upcall_type_to_string(enum dpif_upcall_type type) int dpif_recv_set(struct dpif *dpif, bool enable) { - int error = dpif->dpif_class->recv_set(dpif, enable); - log_operation(dpif, "recv_set", error); + int error = 0; + + if (dpif->dpif_class->recv_set) { + error = dpif->dpif_class->recv_set(dpif, enable); + log_operation(dpif, "recv_set", error); + } return error; } @@ -1333,11 +1337,61 @@ dpif_recv_set(struct dpif *dpif, bool enable) int dpif_handlers_set(struct dpif *dpif, uint32_t n_handlers) { - int error = dpif->dpif_class->handlers_set(dpif, n_handlers); - log_operation(dpif, "handlers_set", error); + int error = 0; + + if (dpif->dpif_class->handlers_set) { + error = dpif->dpif_class->handlers_set(dpif, n_handlers); + log_operation(dpif, "handlers_set", error); + } return error; } +void +dpif_register_upcall_cb(struct dpif *dpif, exec_upcall_cb *cb) +{ + if (dpif->dpif_class->register_upcall_cb) { + dpif->dpif_class->register_upcall_cb(dpif, cb); + } +} + +void +dpif_enable_upcall(struct dpif *dpif) +{ + if (dpif->dpif_class->enable_upcall) { + dpif->dpif_class->enable_upcall(dpif); + } +} + +void +dpif_disable_upcall(struct dpif *dpif) +{ + if (dpif->dpif_class->disable_upcall) { + dpif->dpif_class->disable_upcall(dpif); + } +} + +void +dpif_print_packet(struct dpif *dpif, struct dpif_upcall *upcall) +{ + if (!VLOG_DROP_DBG(&dpmsg_rl)) { + struct ds flow; + char *packet; + + packet = ofp_packet_to_string(ofpbuf_data(&upcall->packet), + ofpbuf_size(&upcall->packet)); + + ds_init(&flow); + odp_flow_key_format(upcall->key, upcall->key_len, &flow); + + VLOG_DBG("%s: %s upcall:\n%s\n%s", + dpif_name(dpif), dpif_upcall_type_to_string(upcall->type), + ds_cstr(&flow), packet); + + ds_destroy(&flow); + free(packet); + } +} + /* Polls for an upcall from 'dpif' for an upcall handler. Since there * there can be multiple poll loops, 'handler_id' is needed as index to * identify the corresponding poll loop. If successful, stores the upcall @@ -1360,25 +1414,15 @@ int dpif_recv(struct dpif *dpif, uint32_t handler_id, struct dpif_upcall *upcall, struct ofpbuf *buf) { - int error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf); - if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) { - struct ds flow; - char *packet; + int error = EAGAIN; - packet = ofp_packet_to_string(ofpbuf_data(&upcall->packet), - ofpbuf_size(&upcall->packet)); - - ds_init(&flow); - odp_flow_key_format(upcall->key, upcall->key_len, &flow); - - VLOG_DBG("%s: %s upcall:\n%s\n%s", - dpif_name(dpif), dpif_upcall_type_to_string(upcall->type), - ds_cstr(&flow), packet); - - ds_destroy(&flow); - free(packet); - } else if (error && error != EAGAIN) { - log_operation(dpif, "recv", error); + if (dpif->dpif_class->recv) { + error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf); + if (!error) { + dpif_print_packet(dpif, upcall); + } else if (error != EAGAIN) { + log_operation(dpif, "recv", error); + } } return error; } @@ -1401,7 +1445,9 @@ dpif_recv_purge(struct dpif *dpif) void dpif_recv_wait(struct dpif *dpif, uint32_t handler_id) { - dpif->dpif_class->recv_wait(dpif, handler_id); + if (dpif->dpif_class->recv_wait) { + dpif->dpif_class->recv_wait(dpif, handler_id); + } } /* Obtains the NetFlow engine type and engine ID for 'dpif' into '*engine_type' diff --git a/lib/dpif.h b/lib/dpif.h index 94bcacc29..8d8e43a26 100644 --- a/lib/dpif.h +++ b/lib/dpif.h @@ -671,12 +671,20 @@ struct dpif_upcall { struct nlattr *userdata; /* Argument to OVS_ACTION_ATTR_USERSPACE. */ }; +typedef void exec_upcall_cb(struct dpif *, struct dpif_upcall *, + struct ofpbuf *, int cnt); + int dpif_recv_set(struct dpif *, bool enable); int dpif_handlers_set(struct dpif *, uint32_t n_handlers); int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *, struct ofpbuf *); void dpif_recv_purge(struct dpif *); void dpif_recv_wait(struct dpif *, uint32_t handler_id); +void dpif_register_upcall_cb(struct dpif *, exec_upcall_cb *); +void dpif_enable_upcall(struct dpif *); +void dpif_disable_upcall(struct dpif *); + +void dpif_print_packet(struct dpif *, struct dpif_upcall *); /* Miscellaneous. */ diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c index df33643c8..f00c17f45 100644 --- a/ofproto/ofproto-dpif-upcall.c +++ b/ofproto/ofproto-dpif-upcall.c @@ -40,7 +40,7 @@ #include "vlog.h" #define MAX_QUEUE_LENGTH 512 -#define UPCALL_MAX_BATCH 50 +#define UPCALL_MAX_BATCH 64 #define REVALIDATE_MAX_BATCH 50 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall); @@ -201,7 +201,9 @@ static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs); static size_t read_upcalls(struct handler *, struct upcall upcalls[UPCALL_MAX_BATCH]); -static void handle_upcalls(struct handler *, struct upcall *, size_t n_upcalls); +static void free_upcall(struct upcall *); +static int convert_upcall(struct udpif *, struct upcall *); +static void handle_upcalls(struct udpif *, struct upcall *, size_t n_upcalls); static void udpif_stop_threads(struct udpif *); static void udpif_start_threads(struct udpif *, size_t n_handlers, size_t n_revalidators); @@ -266,6 +268,8 @@ udpif_create(struct dpif_backer *backer, struct dpif *dpif) atomic_init(&udpif->n_flows_timestamp, LLONG_MIN); ovs_mutex_init(&udpif->n_flows_mutex); + dpif_register_upcall_cb(dpif, exec_upcalls); + return udpif; } @@ -317,6 +321,8 @@ udpif_stop_threads(struct udpif *udpif) xpthread_join(udpif->revalidators[i].thread, NULL); } + dpif_disable_upcall(udpif->dpif); + for (i = 0; i < udpif->n_revalidators; i++) { struct revalidator *revalidator = &udpif->revalidators[i]; @@ -367,6 +373,8 @@ udpif_start_threads(struct udpif *udpif, size_t n_handlers, "handler", udpif_upcall_handler, handler); } + dpif_enable_upcall(udpif->dpif); + ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators); udpif->reval_exit = false; udpif->revalidators = xzalloc(udpif->n_revalidators @@ -539,12 +547,10 @@ udpif_upcall_handler(void *arg) latch_wait(&udpif->exit_latch); poll_block(); } else { - handle_upcalls(handler, upcalls, n_upcalls); + handle_upcalls(handler->udpif, upcalls, n_upcalls); for (i = 0; i < n_upcalls; i++) { - xlate_out_uninit(&upcalls[i].xout); - ofpbuf_uninit(&upcalls[i].dpif_upcall.packet); - ofpbuf_uninit(&upcalls[i].upcall_buf); + free_upcall(&upcalls[i]); } } coverage_clear(); @@ -751,6 +757,63 @@ upcall_init(struct upcall *upcall, struct flow *flow, struct ofpbuf *packet, xlate_actions(&xin, &upcall->xout); } +void +free_upcall(struct upcall *upcall) +{ + xlate_out_uninit(&upcall->xout); + ofpbuf_uninit(&upcall->dpif_upcall.packet); + ofpbuf_uninit(&upcall->upcall_buf); +} + +static struct udpif * +find_udpif(struct dpif *dpif) +{ + struct udpif *udpif; + + LIST_FOR_EACH (udpif, list_node, &all_udpifs) { + if (udpif->dpif == dpif) { + return udpif; + } + } + return NULL; +} + +void +exec_upcalls(struct dpif *dpif, struct dpif_upcall *dupcalls, + struct ofpbuf *bufs, int cnt) +{ + struct upcall upcalls[UPCALL_MAX_BATCH]; + struct udpif *udpif; + int i, j; + + udpif = find_udpif(dpif); + ovs_assert(udpif); + + for (i = 0; i < cnt; i += UPCALL_MAX_BATCH) { + size_t n_upcalls = 0; + for (j = i; j < MIN(i + UPCALL_MAX_BATCH, cnt); j++) { + struct upcall *upcall = &upcalls[n_upcalls]; + struct dpif_upcall *dupcall = &dupcalls[j]; + struct ofpbuf *buf = &bufs[j]; + + upcall->dpif_upcall = *dupcall; + upcall->upcall_buf = *buf; + + dpif_print_packet(dpif, dupcall); + if (!convert_upcall(udpif, upcall)) { + n_upcalls += 1; + } + } + + if (n_upcalls) { + handle_upcalls(udpif, upcalls, n_upcalls); + for (j = 0; j < n_upcalls; j++) { + free_upcall(&upcalls[j]); + } + } + } +} + /* Reads and classifies upcalls. Returns the number of upcalls successfully * read. */ static size_t @@ -764,14 +827,6 @@ read_upcalls(struct handler *handler, /* Try reading UPCALL_MAX_BATCH upcalls from dpif. */ for (i = 0; i < UPCALL_MAX_BATCH; i++) { struct upcall *upcall = &upcalls[n_upcalls]; - struct dpif_upcall *dupcall; - struct ofpbuf *packet; - struct ofproto_dpif *ofproto; - struct dpif_sflow *sflow; - struct dpif_ipfix *ipfix; - struct flow flow; - enum upcall_type type; - odp_port_t odp_in_port; int error; ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub, @@ -783,91 +838,107 @@ read_upcalls(struct handler *handler, break; } - dupcall = &upcall->dpif_upcall; - packet = &dupcall->packet; - error = xlate_receive(udpif->backer, packet, dupcall->key, - dupcall->key_len, &flow, - &ofproto, &ipfix, &sflow, NULL, &odp_in_port); - if (error) { - if (error == ENODEV) { - /* Received packet on datapath port for which we couldn't - * associate an ofproto. This can happen if a port is removed - * while traffic is being received. Print a rate-limited - * message in case it happens frequently. Install a drop flow - * so that future packets of the flow are inexpensively dropped - * in the kernel. */ - VLOG_INFO_RL(&rl, "received packet on unassociated datapath " - "port %"PRIu32, odp_in_port); - dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY, - dupcall->key, dupcall->key_len, NULL, 0, NULL, 0, - NULL); - } - goto destroy_upcall; + if (!convert_upcall(udpif, upcall)) { + n_upcalls += 1; } - - type = classify_upcall(upcall); - if (type == MISS_UPCALL) { - upcall_init(upcall, &flow, packet, ofproto, dupcall, odp_in_port); - n_upcalls++; - continue; - } - - switch (type) { - case SFLOW_UPCALL: - if (sflow) { - union user_action_cookie cookie; - - memset(&cookie, 0, sizeof cookie); - memcpy(&cookie, nl_attr_get(dupcall->userdata), - sizeof cookie.sflow); - dpif_sflow_received(sflow, packet, &flow, odp_in_port, - &cookie); - } - break; - case IPFIX_UPCALL: - if (ipfix) { - dpif_ipfix_bridge_sample(ipfix, packet, &flow); - } - break; - case FLOW_SAMPLE_UPCALL: - if (ipfix) { - union user_action_cookie cookie; - - memset(&cookie, 0, sizeof cookie); - memcpy(&cookie, nl_attr_get(dupcall->userdata), - sizeof cookie.flow_sample); - - /* The flow reflects exactly the contents of the packet. - * Sample the packet using it. */ - dpif_ipfix_flow_sample(ipfix, packet, &flow, - cookie.flow_sample.collector_set_id, - cookie.flow_sample.probability, - cookie.flow_sample.obs_domain_id, - cookie.flow_sample.obs_point_id); - } - break; - case BAD_UPCALL: - break; - case MISS_UPCALL: - OVS_NOT_REACHED(); - } - - dpif_ipfix_unref(ipfix); - dpif_sflow_unref(sflow); - -destroy_upcall: - ofpbuf_uninit(&upcall->dpif_upcall.packet); - ofpbuf_uninit(&upcall->upcall_buf); } - return n_upcalls; } +int +convert_upcall(struct udpif *udpif, struct upcall *upcall) +{ + struct dpif_upcall *dupcall = &upcall->dpif_upcall; + struct ofpbuf *packet = &dupcall->packet; + struct ofproto_dpif *ofproto; + struct dpif_sflow *sflow; + struct dpif_ipfix *ipfix; + struct flow flow; + enum upcall_type type; + odp_port_t odp_in_port; + int error; + + error = xlate_receive(udpif->backer, packet, dupcall->key, + dupcall->key_len, &flow, + &ofproto, &ipfix, &sflow, NULL, &odp_in_port); + + if (error) { + if (error == ENODEV) { + /* Received packet on datapath port for which we couldn't + * associate an ofproto. This can happen if a port is removed + * while traffic is being received. Print a rate-limited + * message in case it happens frequently. Install a drop flow + * so that future packets of the flow are inexpensively dropped + * in the kernel. */ + VLOG_INFO_RL(&rl, "received packet on unassociated datapath " + "port %"PRIu32, odp_in_port); + dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY, + dupcall->key, dupcall->key_len, NULL, 0, NULL, 0, + NULL); + } + goto destroy_upcall; + } + + type = classify_upcall(upcall); + if (type == MISS_UPCALL) { + upcall_init(upcall, &flow, packet, ofproto, dupcall, odp_in_port); + return error; + } + + switch (type) { + case SFLOW_UPCALL: + if (sflow) { + union user_action_cookie cookie; + + memset(&cookie, 0, sizeof cookie); + memcpy(&cookie, nl_attr_get(dupcall->userdata), + sizeof cookie.sflow); + dpif_sflow_received(sflow, packet, &flow, odp_in_port, + &cookie); + } + break; + case IPFIX_UPCALL: + if (ipfix) { + dpif_ipfix_bridge_sample(ipfix, packet, &flow); + } + break; + case FLOW_SAMPLE_UPCALL: + if (ipfix) { + union user_action_cookie cookie; + + memset(&cookie, 0, sizeof cookie); + memcpy(&cookie, nl_attr_get(dupcall->userdata), + sizeof cookie.flow_sample); + + /* The flow reflects exactly the contents of the packet. + * Sample the packet using it. */ + dpif_ipfix_flow_sample(ipfix, packet, &flow, + cookie.flow_sample.collector_set_id, + cookie.flow_sample.probability, + cookie.flow_sample.obs_domain_id, + cookie.flow_sample.obs_point_id); + } + break; + case BAD_UPCALL: + break; + case MISS_UPCALL: + OVS_NOT_REACHED(); + } + + dpif_ipfix_unref(ipfix); + dpif_sflow_unref(sflow); + error = EAGAIN; + +destroy_upcall: + ofpbuf_uninit(&upcall->dpif_upcall.packet); + ofpbuf_uninit(&upcall->upcall_buf); + return error; +} + static void -handle_upcalls(struct handler *handler, struct upcall *upcalls, +handle_upcalls(struct udpif *udpif, struct upcall *upcalls, size_t n_upcalls) { - struct udpif *udpif = handler->udpif; struct dpif_op *opsp[UPCALL_MAX_BATCH * 2]; struct dpif_op ops[UPCALL_MAX_BATCH * 2]; size_t n_ops, i; diff --git a/ofproto/ofproto-dpif-upcall.h b/ofproto/ofproto-dpif-upcall.h index 8c4b655c7..2b197ada6 100644 --- a/ofproto/ofproto-dpif-upcall.h +++ b/ofproto/ofproto-dpif-upcall.h @@ -19,6 +19,8 @@ struct dpif; struct dpif_backer; +struct dpif_upcall; +struct ofpbuf; struct seq; struct simap; @@ -26,6 +28,9 @@ struct simap; * them. Additionally, it's responsible for maintaining the datapath flow * table. */ +void exec_upcalls(struct dpif *, struct dpif_upcall *, struct ofpbuf *, + int cnt); + struct udpif *udpif_create(struct dpif_backer *, struct dpif *); void udpif_run(struct udpif *udpif); void udpif_set_threads(struct udpif *, size_t n_handlers,