2
0
mirror of https://github.com/openvswitch/ovs synced 2025-09-01 23:05:29 +00:00

netdev: netdev_send accepts multiple packets

The netdev_send function has been modified to accept multiple packets, to
allow netdev providers to amortize locking and queuing costs.
This is especially true for netdev-dpdk.

Later commits exploit the new API.

Signed-off-by: Daniele Di Proietto <ddiproietto@vmware.com>
Acked-by: Pravin B Shelar <pshelar@nicira.com>
This commit is contained in:
Daniele Di Proietto
2014-06-23 11:43:58 -07:00
committed by Pravin B Shelar
parent 910885540a
commit f4fd623c4c
8 changed files with 242 additions and 156 deletions

View File

@@ -2123,7 +2123,7 @@ dp_execute_cb(void *aux_, struct dpif_packet *packet,
case OVS_ACTION_ATTR_OUTPUT: case OVS_ACTION_ATTR_OUTPUT:
p = dp_netdev_lookup_port(aux->dp, u32_to_odp(nl_attr_get_u32(a))); p = dp_netdev_lookup_port(aux->dp, u32_to_odp(nl_attr_get_u32(a)));
if (p) { if (p) {
netdev_send(p->netdev, packet, may_steal); netdev_send(p->netdev, &packet, 1, may_steal);
} }
break; break;

View File

@@ -686,14 +686,13 @@ netdev_bsd_rxq_drain(struct netdev_rxq *rxq_)
* system or a tap device. * system or a tap device.
*/ */
static int static int
netdev_bsd_send(struct netdev *netdev_, struct dpif_packet *pkt, netdev_bsd_send(struct netdev *netdev_, struct dpif_packet **pkts, int cnt,
bool may_steal) bool may_steal)
{ {
struct netdev_bsd *dev = netdev_bsd_cast(netdev_); struct netdev_bsd *dev = netdev_bsd_cast(netdev_);
const char *name = netdev_get_name(netdev_); const char *name = netdev_get_name(netdev_);
const void *data = ofpbuf_data(&pkt->ofpbuf);
size_t size = ofpbuf_size(&pkt->ofpbuf);
int error; int error;
int i;
ovs_mutex_lock(&dev->mutex); ovs_mutex_lock(&dev->mutex);
if (dev->tap_fd < 0 && !dev->pcap) { if (dev->tap_fd < 0 && !dev->pcap) {
@@ -702,35 +701,43 @@ netdev_bsd_send(struct netdev *netdev_, struct dpif_packet *pkt,
error = 0; error = 0;
} }
while (!error) { for (i = 0; i < cnt; i++) {
ssize_t retval; const void *data = ofpbuf_data(&pkts[i]->ofpbuf);
if (dev->tap_fd >= 0) { size_t size = ofpbuf_size(&pkts[i]->ofpbuf);
retval = write(dev->tap_fd, data, size);
} else { while (!error) {
retval = pcap_inject(dev->pcap, data, size); ssize_t retval;
} if (dev->tap_fd >= 0) {
if (retval < 0) { retval = write(dev->tap_fd, data, size);
if (errno == EINTR) {
continue;
} else { } else {
error = errno; retval = pcap_inject(dev->pcap, data, size);
if (error != EAGAIN) { }
VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: " if (retval < 0) {
"%s", name, ovs_strerror(error)); if (errno == EINTR) {
} continue;
} else {
error = errno;
if (error != EAGAIN) {
VLOG_WARN_RL(&rl, "error sending Ethernet packet on"
" %s: %s", name, ovs_strerror(error));
}
}
} else if (retval != size) {
VLOG_WARN_RL(&rl, "sent partial Ethernet packet "
"(%"PRIuSIZE" bytes of "
"%"PRIuSIZE") on %s", retval, size, name);
error = EMSGSIZE;
} else {
break;
} }
} else if (retval != size) {
VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE" bytes of "
"%"PRIuSIZE") on %s", retval, size, name);
error = EMSGSIZE;
} else {
break;
} }
} }
ovs_mutex_unlock(&dev->mutex); ovs_mutex_unlock(&dev->mutex);
if (may_steal) { if (may_steal) {
dpif_packet_delete(pkt); for (i = 0; i < cnt; i++) {
dpif_packet_delete(pkts[i]);
}
} }
return error; return error;

View File

@@ -206,8 +206,8 @@ dpdk_rte_mzalloc(size_t sz)
void void
free_dpdk_buf(struct dpif_packet *p) free_dpdk_buf(struct dpif_packet *p)
{ {
struct ofpbuf *b = &p->ofpbuf; struct ofpbuf *ofp = &p->ofpbuf;
struct rte_mbuf *pkt = (struct rte_mbuf *) b->dpdk_buf; struct rte_mbuf *pkt = (struct rte_mbuf *) ofp->dpdk_buf;
rte_mempool_put(pkt->pool, pkt); rte_mempool_put(pkt->pool, pkt);
} }
@@ -612,104 +612,151 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct dpif_packet **packets,
} }
inline static void inline static void
dpdk_queue_pkt(struct netdev_dpdk *dev, int qid, dpdk_queue_pkts(struct netdev_dpdk *dev, int qid,
struct rte_mbuf *pkt) struct rte_mbuf **pkts, int cnt)
{ {
struct dpdk_tx_queue *txq = &dev->tx_q[qid]; struct dpdk_tx_queue *txq = &dev->tx_q[qid];
uint64_t diff_tsc; uint64_t diff_tsc;
uint64_t cur_tsc; uint64_t cur_tsc;
uint32_t nb_tx; uint32_t nb_tx;
rte_spinlock_lock(&txq->tx_lock); int i = 0;
txq->burst_pkts[txq->count++] = pkt;
if (txq->count == MAX_TX_QUEUE_LEN) {
goto flush;
}
cur_tsc = rte_get_timer_cycles();
if (txq->count == 1) {
txq->tsc = cur_tsc;
}
diff_tsc = cur_tsc - txq->tsc;
if (diff_tsc >= DRAIN_TSC) {
goto flush;
}
rte_spinlock_unlock(&txq->tx_lock);
return;
flush: rte_spinlock_lock(&txq->tx_lock);
nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count); while (i < cnt) {
if (nb_tx != txq->count) { int freeslots = MAX_TX_QUEUE_LEN - txq->count;
/* free buffers if we couldn't transmit packets */ int tocopy = MIN(freeslots, cnt-i);
rte_mempool_put_bulk(dev->dpdk_mp->mp,
(void **) &txq->burst_pkts[nb_tx], memcpy(&txq->burst_pkts[txq->count], &pkts[i],
(txq->count - nb_tx)); tocopy * sizeof (struct rte_mbuf *));
txq->count += tocopy;
i += tocopy;
if (txq->count == MAX_TX_QUEUE_LEN) {
goto flush;
}
cur_tsc = rte_get_timer_cycles();
if (txq->count == 1) {
txq->tsc = cur_tsc;
}
diff_tsc = cur_tsc - txq->tsc;
if (diff_tsc >= DRAIN_TSC) {
goto flush;
}
continue;
flush:
nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts,
txq->count);
if (nb_tx != txq->count) {
/* free buffers if we couldn't transmit packets */
rte_mempool_put_bulk(dev->dpdk_mp->mp,
(void **) &txq->burst_pkts[nb_tx],
(txq->count - nb_tx));
}
txq->count = 0;
} }
txq->count = 0;
rte_spinlock_unlock(&txq->tx_lock); rte_spinlock_unlock(&txq->tx_lock);
} }
/* Tx function. Transmit packets indefinitely */ /* Tx function. Transmit packets indefinitely */
static void static void
dpdk_do_tx_copy(struct netdev *netdev, char *buf, int size) dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt)
{ {
struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
struct rte_mbuf *pkt; struct rte_mbuf *mbufs[cnt];
int i, newcnt = 0;
pkt = rte_pktmbuf_alloc(dev->dpdk_mp->mp); for (i = 0; i < cnt; i++) {
if (!pkt) { int size = ofpbuf_size(&pkts[i]->ofpbuf);
ovs_mutex_lock(&dev->mutex); if (size > dev->max_packet_len) {
dev->stats.tx_dropped++; VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
ovs_mutex_unlock(&dev->mutex); (int)size , dev->max_packet_len);
return;
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_dropped++;
ovs_mutex_unlock(&dev->mutex);
continue;
}
mbufs[newcnt] = rte_pktmbuf_alloc(dev->dpdk_mp->mp);
if (!mbufs[newcnt]) {
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_dropped++;
ovs_mutex_unlock(&dev->mutex);
return;
}
/* We have to do a copy for now */
memcpy(mbufs[newcnt]->pkt.data, ofpbuf_data(&pkts[i]->ofpbuf), size);
rte_pktmbuf_data_len(mbufs[newcnt]) = size;
rte_pktmbuf_pkt_len(mbufs[newcnt]) = size;
newcnt++;
} }
/* We have to do a copy for now */ dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, mbufs, newcnt);
memcpy(pkt->pkt.data, buf, size);
rte_pktmbuf_data_len(pkt) = size;
rte_pktmbuf_pkt_len(pkt) = size;
dpdk_queue_pkt(dev, NON_PMD_THREAD_TX_QUEUE, pkt);
dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE); dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE);
} }
static int static int
netdev_dpdk_send(struct netdev *netdev, netdev_dpdk_send(struct netdev *netdev, struct dpif_packet **pkts, int cnt,
struct dpif_packet *packet, bool may_steal) bool may_steal)
{ {
struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
struct ofpbuf *ofpbuf = &packet->ofpbuf;
int ret; int ret;
int i;
if (ofpbuf_size(ofpbuf) > dev->max_packet_len) { if (!may_steal || pkts[0]->ofpbuf.source != OFPBUF_DPDK) {
VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d", dpdk_do_tx_copy(netdev, pkts, cnt);
(int)ofpbuf_size(ofpbuf) , dev->max_packet_len);
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_dropped++;
ovs_mutex_unlock(&dev->mutex);
ret = E2BIG;
goto out;
}
if (!may_steal || ofpbuf->source != OFPBUF_DPDK) {
dpdk_do_tx_copy(netdev, (char *) ofpbuf_data(ofpbuf), ofpbuf_size(ofpbuf));
if (may_steal) { if (may_steal) {
dpif_packet_delete(packet); for (i = 0; i < cnt; i++) {
dpif_packet_delete(pkts[i]);
}
} }
} else { } else {
int qid; int qid;
int next_tx_idx = 0;
int dropped = 0;
qid = rte_lcore_id() % NR_QUEUE; qid = rte_lcore_id() % NR_QUEUE;
dpdk_queue_pkt(dev, qid, (struct rte_mbuf *)ofpbuf); for (i = 0; i < cnt; i++) {
int size = ofpbuf_size(&pkts[i]->ofpbuf);
if (OVS_UNLIKELY(size > dev->max_packet_len)) {
if (next_tx_idx != i) {
dpdk_queue_pkts(dev, qid,
(struct rte_mbuf **)&pkts[next_tx_idx],
i-next_tx_idx);
VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
(int)size , dev->max_packet_len);
dpif_packet_delete(pkts[i]);
dropped++;
}
next_tx_idx = i + 1;
}
}
if (next_tx_idx != cnt) {
dpdk_queue_pkts(dev, qid,
(struct rte_mbuf **)&pkts[next_tx_idx],
cnt-next_tx_idx);
}
if (OVS_UNLIKELY(dropped)) {
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_dropped += dropped;
ovs_mutex_unlock(&dev->mutex);
}
} }
ret = 0; ret = 0;
out:
return ret; return ret;
} }

View File

@@ -846,51 +846,61 @@ netdev_dummy_rxq_drain(struct netdev_rxq *rxq_)
} }
static int static int
netdev_dummy_send(struct netdev *netdev, struct dpif_packet *pkt, netdev_dummy_send(struct netdev *netdev, struct dpif_packet **pkts, int cnt,
bool may_steal) bool may_steal)
{ {
struct netdev_dummy *dev = netdev_dummy_cast(netdev); struct netdev_dummy *dev = netdev_dummy_cast(netdev);
const void *buffer = ofpbuf_data(&pkt->ofpbuf); int error = 0;
size_t size = ofpbuf_size(&pkt->ofpbuf); int i;
if (size < ETH_HEADER_LEN) { for (i = 0; i < cnt; i++) {
return EMSGSIZE; const void *buffer = ofpbuf_data(&pkts[i]->ofpbuf);
} else { size_t size = ofpbuf_size(&pkts[i]->ofpbuf);
const struct eth_header *eth = buffer;
int max_size; if (size < ETH_HEADER_LEN) {
error = EMSGSIZE;
break;
} else {
const struct eth_header *eth = buffer;
int max_size;
ovs_mutex_lock(&dev->mutex);
max_size = dev->mtu + ETH_HEADER_LEN;
ovs_mutex_unlock(&dev->mutex);
if (eth->eth_type == htons(ETH_TYPE_VLAN)) {
max_size += VLAN_HEADER_LEN;
}
if (size > max_size) {
error = EMSGSIZE;
break;
}
}
ovs_mutex_lock(&dev->mutex); ovs_mutex_lock(&dev->mutex);
max_size = dev->mtu + ETH_HEADER_LEN; dev->stats.tx_packets++;
dev->stats.tx_bytes += size;
dummy_packet_conn_send(&dev->conn, buffer, size);
if (dev->tx_pcap) {
struct ofpbuf packet;
ofpbuf_use_const(&packet, buffer, size);
ovs_pcap_write(dev->tx_pcap, &packet);
fflush(dev->tx_pcap);
}
ovs_mutex_unlock(&dev->mutex); ovs_mutex_unlock(&dev->mutex);
if (eth->eth_type == htons(ETH_TYPE_VLAN)) {
max_size += VLAN_HEADER_LEN;
}
if (size > max_size) {
return EMSGSIZE;
}
} }
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_packets++;
dev->stats.tx_bytes += size;
dummy_packet_conn_send(&dev->conn, buffer, size);
if (dev->tx_pcap) {
struct ofpbuf packet;
ofpbuf_use_const(&packet, buffer, size);
ovs_pcap_write(dev->tx_pcap, &packet);
fflush(dev->tx_pcap);
}
ovs_mutex_unlock(&dev->mutex);
if (may_steal) { if (may_steal) {
dpif_packet_delete(pkt); for (i = 0; i < cnt; i++) {
dpif_packet_delete(pkts[i]);
}
} }
return 0; return error;
} }
static int static int

View File

@@ -1012,7 +1012,7 @@ netdev_linux_rxq_recv(struct netdev_rxq *rxq_, struct dpif_packet **packets,
VLOG_WARN_RL(&rl, "error receiving Ethernet packet on %s: %s", VLOG_WARN_RL(&rl, "error receiving Ethernet packet on %s: %s",
ovs_strerror(errno), netdev_rxq_get_name(rxq_)); ovs_strerror(errno), netdev_rxq_get_name(rxq_));
} }
ofpbuf_delete(buffer); dpif_packet_delete(packet);
} else { } else {
dp_packet_pad(buffer); dp_packet_pad(buffer);
packets[0] = packet; packets[0] = packet;
@@ -1057,13 +1057,16 @@ netdev_linux_rxq_drain(struct netdev_rxq *rxq_)
* The kernel maintains a packet transmission queue, so the caller is not * The kernel maintains a packet transmission queue, so the caller is not
* expected to do additional queuing of packets. */ * expected to do additional queuing of packets. */
static int static int
netdev_linux_send(struct netdev *netdev_, struct dpif_packet *pkt, netdev_linux_send(struct netdev *netdev_, struct dpif_packet **pkts, int cnt,
bool may_steal) bool may_steal)
{ {
const void *data = ofpbuf_data(&pkt->ofpbuf); int i;
size_t size = ofpbuf_size(&pkt->ofpbuf); int error = 0;
for (;;) { /* 'i' is incremented only if there's no error */
for (i = 0; i < cnt;) {
const void *data = ofpbuf_data(&pkts[i]->ofpbuf);
size_t size = ofpbuf_size(&pkts[i]->ofpbuf);
ssize_t retval; ssize_t retval;
if (!is_tap_netdev(netdev_)) { if (!is_tap_netdev(netdev_)) {
@@ -1113,31 +1116,41 @@ netdev_linux_send(struct netdev *netdev_, struct dpif_packet *pkt,
retval = write(netdev->tap_fd, data, size); retval = write(netdev->tap_fd, data, size);
} }
if (may_steal) {
dpif_packet_delete(pkt);
}
if (retval < 0) { if (retval < 0) {
/* The Linux AF_PACKET implementation never blocks waiting for room /* The Linux AF_PACKET implementation never blocks waiting for room
* for packets, instead returning ENOBUFS. Translate this into * for packets, instead returning ENOBUFS. Translate this into
* EAGAIN for the caller. */ * EAGAIN for the caller. */
if (errno == ENOBUFS) { error = errno == ENOBUFS ? EAGAIN : errno;
return EAGAIN; if (error == EINTR) {
} else if (errno == EINTR) { /* continue without incrementing 'i', i.e. retry this packet */
continue; continue;
} else if (errno != EAGAIN) {
VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: %s",
netdev_get_name(netdev_), ovs_strerror(errno));
} }
return errno; break;
} else if (retval != size) { } else if (retval != size) {
VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE" bytes of " VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE" bytes"
"%"PRIuSIZE") on %s", retval, size, netdev_get_name(netdev_)); " of %"PRIuSIZE") on %s", retval, size,
return EMSGSIZE; netdev_get_name(netdev_));
} else { error = EMSGSIZE;
return 0; break;
}
/* Process the next packet in the batch */
i++;
}
if (may_steal) {
for (i = 0; i < cnt; i++) {
dpif_packet_delete(pkts[i]);
} }
} }
if (error && error != EAGAIN) {
VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: %s",
netdev_get_name(netdev_), ovs_strerror(error));
}
return error;
} }
/* Registers with the poll loop to wake up from the next call to poll_block() /* Registers with the poll loop to wake up from the next call to poll_block()

View File

@@ -250,13 +250,16 @@ struct netdev_class {
const struct netdev_tunnel_config * const struct netdev_tunnel_config *
(*get_tunnel_config)(const struct netdev *netdev); (*get_tunnel_config)(const struct netdev *netdev);
/* Sends the buffer on 'netdev'. /* Sends buffers on 'netdev'.
* Returns 0 if successful, otherwise a positive errno value. Returns * Returns 0 if successful (for every buffer), otherwise a positive errno value.
* EAGAIN without blocking if the packet cannot be queued immediately. * Returns EAGAIN without blocking if one or more packets cannot be
* Returns EMSGSIZE if a partial packet was transmitted or if the packet * queued immediately. Returns EMSGSIZE if a partial packet was transmitted
* is too big or too small to transmit on the device. * or if a packet is too big or too small to transmit on the device.
* *
* To retain ownership of 'buffer' caller can set may_steal to false. * If the function returns a non-zero value, some of the packets might have
* been sent anyway.
*
* To retain ownership of 'buffers' caller can set may_steal to false.
* *
* The network device is expected to maintain a packet transmission queue, * The network device is expected to maintain a packet transmission queue,
* so that the caller does not ordinarily have to do additional queuing of * so that the caller does not ordinarily have to do additional queuing of
@@ -268,7 +271,7 @@ struct netdev_class {
* network device from being usefully used by the netdev-based "userspace * network device from being usefully used by the netdev-based "userspace
* datapath". It will also prevent the OVS implementation of bonding from * datapath". It will also prevent the OVS implementation of bonding from
* working properly over 'netdev'.) */ * working properly over 'netdev'.) */
int (*send)(struct netdev *netdev, struct dpif_packet *buffer, int (*send)(struct netdev *netdev, struct dpif_packet **buffers, int cnt,
bool may_steal); bool may_steal);
/* Registers with the poll loop to wake up from the next call to /* Registers with the poll loop to wake up from the next call to

View File

@@ -650,10 +650,14 @@ netdev_rxq_drain(struct netdev_rxq *rx)
: 0); : 0);
} }
/* Sends 'buffer' on 'netdev'. Returns 0 if successful, otherwise a positive /* Sends 'buffers' on 'netdev'. Returns 0 if successful (for every packet),
* errno value. Returns EAGAIN without blocking if the packet cannot be queued * otherwise a positive errno value. Returns EAGAIN without blocking if
* immediately. Returns EMSGSIZE if a partial packet was transmitted or if * at least one the packets cannot be queued immediately. Returns EMSGSIZE
* the packet is too big or too small to transmit on the device. * if a partial packet was transmitted or if a packet is too big or too small
* to transmit on the device.
*
* If the function returns a non-zero value, some of the packets might have
* been sent anyway.
* *
* To retain ownership of 'buffer' caller can set may_steal to false. * To retain ownership of 'buffer' caller can set may_steal to false.
* *
@@ -663,12 +667,13 @@ netdev_rxq_drain(struct netdev_rxq *rx)
* Some network devices may not implement support for this function. In such * Some network devices may not implement support for this function. In such
* cases this function will always return EOPNOTSUPP. */ * cases this function will always return EOPNOTSUPP. */
int int
netdev_send(struct netdev *netdev, struct dpif_packet *buffer, bool may_steal) netdev_send(struct netdev *netdev, struct dpif_packet **buffers, int cnt,
bool may_steal)
{ {
int error; int error;
error = (netdev->netdev_class->send error = (netdev->netdev_class->send
? netdev->netdev_class->send(netdev, buffer, may_steal) ? netdev->netdev_class->send(netdev, buffers, cnt, may_steal)
: EOPNOTSUPP); : EOPNOTSUPP);
if (!error) { if (!error) {
COVERAGE_INC(netdev_sent); COVERAGE_INC(netdev_sent);

View File

@@ -173,7 +173,8 @@ void netdev_rxq_wait(struct netdev_rxq *);
int netdev_rxq_drain(struct netdev_rxq *); int netdev_rxq_drain(struct netdev_rxq *);
/* Packet transmission. */ /* Packet transmission. */
int netdev_send(struct netdev *, struct dpif_packet *, bool may_steal); int netdev_send(struct netdev *, struct dpif_packet **, int cnt,
bool may_steal);
void netdev_send_wait(struct netdev *); void netdev_send_wait(struct netdev *);
/* Hardware address. */ /* Hardware address. */