2
0
mirror of https://github.com/openvswitch/ovs synced 2025-08-30 22:05:19 +00:00

netdev: netdev_send accepts multiple packets

The netdev_send function has been modified to accept multiple packets, to
allow netdev providers to amortize locking and queuing costs.
This is especially true for netdev-dpdk.

Later commits exploit the new API.

Signed-off-by: Daniele Di Proietto <ddiproietto@vmware.com>
Acked-by: Pravin B Shelar <pshelar@nicira.com>
This commit is contained in:
Daniele Di Proietto
2014-06-23 11:43:58 -07:00
committed by Pravin B Shelar
parent 910885540a
commit f4fd623c4c
8 changed files with 242 additions and 156 deletions

View File

@@ -2123,7 +2123,7 @@ dp_execute_cb(void *aux_, struct dpif_packet *packet,
case OVS_ACTION_ATTR_OUTPUT:
p = dp_netdev_lookup_port(aux->dp, u32_to_odp(nl_attr_get_u32(a)));
if (p) {
netdev_send(p->netdev, packet, may_steal);
netdev_send(p->netdev, &packet, 1, may_steal);
}
break;

View File

@@ -686,14 +686,13 @@ netdev_bsd_rxq_drain(struct netdev_rxq *rxq_)
* system or a tap device.
*/
static int
netdev_bsd_send(struct netdev *netdev_, struct dpif_packet *pkt,
netdev_bsd_send(struct netdev *netdev_, struct dpif_packet **pkts, int cnt,
bool may_steal)
{
struct netdev_bsd *dev = netdev_bsd_cast(netdev_);
const char *name = netdev_get_name(netdev_);
const void *data = ofpbuf_data(&pkt->ofpbuf);
size_t size = ofpbuf_size(&pkt->ofpbuf);
int error;
int i;
ovs_mutex_lock(&dev->mutex);
if (dev->tap_fd < 0 && !dev->pcap) {
@@ -702,35 +701,43 @@ netdev_bsd_send(struct netdev *netdev_, struct dpif_packet *pkt,
error = 0;
}
while (!error) {
ssize_t retval;
if (dev->tap_fd >= 0) {
retval = write(dev->tap_fd, data, size);
} else {
retval = pcap_inject(dev->pcap, data, size);
}
if (retval < 0) {
if (errno == EINTR) {
continue;
for (i = 0; i < cnt; i++) {
const void *data = ofpbuf_data(&pkts[i]->ofpbuf);
size_t size = ofpbuf_size(&pkts[i]->ofpbuf);
while (!error) {
ssize_t retval;
if (dev->tap_fd >= 0) {
retval = write(dev->tap_fd, data, size);
} else {
error = errno;
if (error != EAGAIN) {
VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: "
"%s", name, ovs_strerror(error));
}
retval = pcap_inject(dev->pcap, data, size);
}
if (retval < 0) {
if (errno == EINTR) {
continue;
} else {
error = errno;
if (error != EAGAIN) {
VLOG_WARN_RL(&rl, "error sending Ethernet packet on"
" %s: %s", name, ovs_strerror(error));
}
}
} else if (retval != size) {
VLOG_WARN_RL(&rl, "sent partial Ethernet packet "
"(%"PRIuSIZE" bytes of "
"%"PRIuSIZE") on %s", retval, size, name);
error = EMSGSIZE;
} else {
break;
}
} else if (retval != size) {
VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE" bytes of "
"%"PRIuSIZE") on %s", retval, size, name);
error = EMSGSIZE;
} else {
break;
}
}
ovs_mutex_unlock(&dev->mutex);
if (may_steal) {
dpif_packet_delete(pkt);
for (i = 0; i < cnt; i++) {
dpif_packet_delete(pkts[i]);
}
}
return error;

View File

@@ -206,8 +206,8 @@ dpdk_rte_mzalloc(size_t sz)
void
free_dpdk_buf(struct dpif_packet *p)
{
struct ofpbuf *b = &p->ofpbuf;
struct rte_mbuf *pkt = (struct rte_mbuf *) b->dpdk_buf;
struct ofpbuf *ofp = &p->ofpbuf;
struct rte_mbuf *pkt = (struct rte_mbuf *) ofp->dpdk_buf;
rte_mempool_put(pkt->pool, pkt);
}
@@ -612,104 +612,151 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct dpif_packet **packets,
}
inline static void
dpdk_queue_pkt(struct netdev_dpdk *dev, int qid,
struct rte_mbuf *pkt)
dpdk_queue_pkts(struct netdev_dpdk *dev, int qid,
struct rte_mbuf **pkts, int cnt)
{
struct dpdk_tx_queue *txq = &dev->tx_q[qid];
uint64_t diff_tsc;
uint64_t cur_tsc;
uint32_t nb_tx;
rte_spinlock_lock(&txq->tx_lock);
txq->burst_pkts[txq->count++] = pkt;
if (txq->count == MAX_TX_QUEUE_LEN) {
goto flush;
}
cur_tsc = rte_get_timer_cycles();
if (txq->count == 1) {
txq->tsc = cur_tsc;
}
diff_tsc = cur_tsc - txq->tsc;
if (diff_tsc >= DRAIN_TSC) {
goto flush;
}
rte_spinlock_unlock(&txq->tx_lock);
return;
int i = 0;
flush:
nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count);
if (nb_tx != txq->count) {
/* free buffers if we couldn't transmit packets */
rte_mempool_put_bulk(dev->dpdk_mp->mp,
(void **) &txq->burst_pkts[nb_tx],
(txq->count - nb_tx));
rte_spinlock_lock(&txq->tx_lock);
while (i < cnt) {
int freeslots = MAX_TX_QUEUE_LEN - txq->count;
int tocopy = MIN(freeslots, cnt-i);
memcpy(&txq->burst_pkts[txq->count], &pkts[i],
tocopy * sizeof (struct rte_mbuf *));
txq->count += tocopy;
i += tocopy;
if (txq->count == MAX_TX_QUEUE_LEN) {
goto flush;
}
cur_tsc = rte_get_timer_cycles();
if (txq->count == 1) {
txq->tsc = cur_tsc;
}
diff_tsc = cur_tsc - txq->tsc;
if (diff_tsc >= DRAIN_TSC) {
goto flush;
}
continue;
flush:
nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts,
txq->count);
if (nb_tx != txq->count) {
/* free buffers if we couldn't transmit packets */
rte_mempool_put_bulk(dev->dpdk_mp->mp,
(void **) &txq->burst_pkts[nb_tx],
(txq->count - nb_tx));
}
txq->count = 0;
}
txq->count = 0;
rte_spinlock_unlock(&txq->tx_lock);
}
/* Tx function. Transmit packets indefinitely */
static void
dpdk_do_tx_copy(struct netdev *netdev, char *buf, int size)
dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt)
{
struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
struct rte_mbuf *pkt;
struct rte_mbuf *mbufs[cnt];
int i, newcnt = 0;
pkt = rte_pktmbuf_alloc(dev->dpdk_mp->mp);
if (!pkt) {
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_dropped++;
ovs_mutex_unlock(&dev->mutex);
return;
for (i = 0; i < cnt; i++) {
int size = ofpbuf_size(&pkts[i]->ofpbuf);
if (size > dev->max_packet_len) {
VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
(int)size , dev->max_packet_len);
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_dropped++;
ovs_mutex_unlock(&dev->mutex);
continue;
}
mbufs[newcnt] = rte_pktmbuf_alloc(dev->dpdk_mp->mp);
if (!mbufs[newcnt]) {
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_dropped++;
ovs_mutex_unlock(&dev->mutex);
return;
}
/* We have to do a copy for now */
memcpy(mbufs[newcnt]->pkt.data, ofpbuf_data(&pkts[i]->ofpbuf), size);
rte_pktmbuf_data_len(mbufs[newcnt]) = size;
rte_pktmbuf_pkt_len(mbufs[newcnt]) = size;
newcnt++;
}
/* We have to do a copy for now */
memcpy(pkt->pkt.data, buf, size);
rte_pktmbuf_data_len(pkt) = size;
rte_pktmbuf_pkt_len(pkt) = size;
dpdk_queue_pkt(dev, NON_PMD_THREAD_TX_QUEUE, pkt);
dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, mbufs, newcnt);
dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE);
}
static int
netdev_dpdk_send(struct netdev *netdev,
struct dpif_packet *packet, bool may_steal)
netdev_dpdk_send(struct netdev *netdev, struct dpif_packet **pkts, int cnt,
bool may_steal)
{
struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
struct ofpbuf *ofpbuf = &packet->ofpbuf;
int ret;
int i;
if (ofpbuf_size(ofpbuf) > dev->max_packet_len) {
VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
(int)ofpbuf_size(ofpbuf) , dev->max_packet_len);
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_dropped++;
ovs_mutex_unlock(&dev->mutex);
ret = E2BIG;
goto out;
}
if (!may_steal || ofpbuf->source != OFPBUF_DPDK) {
dpdk_do_tx_copy(netdev, (char *) ofpbuf_data(ofpbuf), ofpbuf_size(ofpbuf));
if (!may_steal || pkts[0]->ofpbuf.source != OFPBUF_DPDK) {
dpdk_do_tx_copy(netdev, pkts, cnt);
if (may_steal) {
dpif_packet_delete(packet);
for (i = 0; i < cnt; i++) {
dpif_packet_delete(pkts[i]);
}
}
} else {
int qid;
int next_tx_idx = 0;
int dropped = 0;
qid = rte_lcore_id() % NR_QUEUE;
dpdk_queue_pkt(dev, qid, (struct rte_mbuf *)ofpbuf);
for (i = 0; i < cnt; i++) {
int size = ofpbuf_size(&pkts[i]->ofpbuf);
if (OVS_UNLIKELY(size > dev->max_packet_len)) {
if (next_tx_idx != i) {
dpdk_queue_pkts(dev, qid,
(struct rte_mbuf **)&pkts[next_tx_idx],
i-next_tx_idx);
VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
(int)size , dev->max_packet_len);
dpif_packet_delete(pkts[i]);
dropped++;
}
next_tx_idx = i + 1;
}
}
if (next_tx_idx != cnt) {
dpdk_queue_pkts(dev, qid,
(struct rte_mbuf **)&pkts[next_tx_idx],
cnt-next_tx_idx);
}
if (OVS_UNLIKELY(dropped)) {
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_dropped += dropped;
ovs_mutex_unlock(&dev->mutex);
}
}
ret = 0;
out:
return ret;
}

View File

@@ -846,51 +846,61 @@ netdev_dummy_rxq_drain(struct netdev_rxq *rxq_)
}
static int
netdev_dummy_send(struct netdev *netdev, struct dpif_packet *pkt,
netdev_dummy_send(struct netdev *netdev, struct dpif_packet **pkts, int cnt,
bool may_steal)
{
struct netdev_dummy *dev = netdev_dummy_cast(netdev);
const void *buffer = ofpbuf_data(&pkt->ofpbuf);
size_t size = ofpbuf_size(&pkt->ofpbuf);
int error = 0;
int i;
if (size < ETH_HEADER_LEN) {
return EMSGSIZE;
} else {
const struct eth_header *eth = buffer;
int max_size;
for (i = 0; i < cnt; i++) {
const void *buffer = ofpbuf_data(&pkts[i]->ofpbuf);
size_t size = ofpbuf_size(&pkts[i]->ofpbuf);
if (size < ETH_HEADER_LEN) {
error = EMSGSIZE;
break;
} else {
const struct eth_header *eth = buffer;
int max_size;
ovs_mutex_lock(&dev->mutex);
max_size = dev->mtu + ETH_HEADER_LEN;
ovs_mutex_unlock(&dev->mutex);
if (eth->eth_type == htons(ETH_TYPE_VLAN)) {
max_size += VLAN_HEADER_LEN;
}
if (size > max_size) {
error = EMSGSIZE;
break;
}
}
ovs_mutex_lock(&dev->mutex);
max_size = dev->mtu + ETH_HEADER_LEN;
dev->stats.tx_packets++;
dev->stats.tx_bytes += size;
dummy_packet_conn_send(&dev->conn, buffer, size);
if (dev->tx_pcap) {
struct ofpbuf packet;
ofpbuf_use_const(&packet, buffer, size);
ovs_pcap_write(dev->tx_pcap, &packet);
fflush(dev->tx_pcap);
}
ovs_mutex_unlock(&dev->mutex);
if (eth->eth_type == htons(ETH_TYPE_VLAN)) {
max_size += VLAN_HEADER_LEN;
}
if (size > max_size) {
return EMSGSIZE;
}
}
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_packets++;
dev->stats.tx_bytes += size;
dummy_packet_conn_send(&dev->conn, buffer, size);
if (dev->tx_pcap) {
struct ofpbuf packet;
ofpbuf_use_const(&packet, buffer, size);
ovs_pcap_write(dev->tx_pcap, &packet);
fflush(dev->tx_pcap);
}
ovs_mutex_unlock(&dev->mutex);
if (may_steal) {
dpif_packet_delete(pkt);
for (i = 0; i < cnt; i++) {
dpif_packet_delete(pkts[i]);
}
}
return 0;
return error;
}
static int

View File

@@ -1012,7 +1012,7 @@ netdev_linux_rxq_recv(struct netdev_rxq *rxq_, struct dpif_packet **packets,
VLOG_WARN_RL(&rl, "error receiving Ethernet packet on %s: %s",
ovs_strerror(errno), netdev_rxq_get_name(rxq_));
}
ofpbuf_delete(buffer);
dpif_packet_delete(packet);
} else {
dp_packet_pad(buffer);
packets[0] = packet;
@@ -1057,13 +1057,16 @@ netdev_linux_rxq_drain(struct netdev_rxq *rxq_)
* The kernel maintains a packet transmission queue, so the caller is not
* expected to do additional queuing of packets. */
static int
netdev_linux_send(struct netdev *netdev_, struct dpif_packet *pkt,
netdev_linux_send(struct netdev *netdev_, struct dpif_packet **pkts, int cnt,
bool may_steal)
{
const void *data = ofpbuf_data(&pkt->ofpbuf);
size_t size = ofpbuf_size(&pkt->ofpbuf);
int i;
int error = 0;
for (;;) {
/* 'i' is incremented only if there's no error */
for (i = 0; i < cnt;) {
const void *data = ofpbuf_data(&pkts[i]->ofpbuf);
size_t size = ofpbuf_size(&pkts[i]->ofpbuf);
ssize_t retval;
if (!is_tap_netdev(netdev_)) {
@@ -1113,31 +1116,41 @@ netdev_linux_send(struct netdev *netdev_, struct dpif_packet *pkt,
retval = write(netdev->tap_fd, data, size);
}
if (may_steal) {
dpif_packet_delete(pkt);
}
if (retval < 0) {
/* The Linux AF_PACKET implementation never blocks waiting for room
* for packets, instead returning ENOBUFS. Translate this into
* EAGAIN for the caller. */
if (errno == ENOBUFS) {
return EAGAIN;
} else if (errno == EINTR) {
error = errno == ENOBUFS ? EAGAIN : errno;
if (error == EINTR) {
/* continue without incrementing 'i', i.e. retry this packet */
continue;
} else if (errno != EAGAIN) {
VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: %s",
netdev_get_name(netdev_), ovs_strerror(errno));
}
return errno;
break;
} else if (retval != size) {
VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE" bytes of "
"%"PRIuSIZE") on %s", retval, size, netdev_get_name(netdev_));
return EMSGSIZE;
} else {
return 0;
VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE" bytes"
" of %"PRIuSIZE") on %s", retval, size,
netdev_get_name(netdev_));
error = EMSGSIZE;
break;
}
/* Process the next packet in the batch */
i++;
}
if (may_steal) {
for (i = 0; i < cnt; i++) {
dpif_packet_delete(pkts[i]);
}
}
if (error && error != EAGAIN) {
VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: %s",
netdev_get_name(netdev_), ovs_strerror(error));
}
return error;
}
/* Registers with the poll loop to wake up from the next call to poll_block()

View File

@@ -250,13 +250,16 @@ struct netdev_class {
const struct netdev_tunnel_config *
(*get_tunnel_config)(const struct netdev *netdev);
/* Sends the buffer on 'netdev'.
* Returns 0 if successful, otherwise a positive errno value. Returns
* EAGAIN without blocking if the packet cannot be queued immediately.
* Returns EMSGSIZE if a partial packet was transmitted or if the packet
* is too big or too small to transmit on the device.
/* Sends buffers on 'netdev'.
* Returns 0 if successful (for every buffer), otherwise a positive errno value.
* Returns EAGAIN without blocking if one or more packets cannot be
* queued immediately. Returns EMSGSIZE if a partial packet was transmitted
* or if a packet is too big or too small to transmit on the device.
*
* To retain ownership of 'buffer' caller can set may_steal to false.
* If the function returns a non-zero value, some of the packets might have
* been sent anyway.
*
* To retain ownership of 'buffers' caller can set may_steal to false.
*
* The network device is expected to maintain a packet transmission queue,
* so that the caller does not ordinarily have to do additional queuing of
@@ -268,7 +271,7 @@ struct netdev_class {
* network device from being usefully used by the netdev-based "userspace
* datapath". It will also prevent the OVS implementation of bonding from
* working properly over 'netdev'.) */
int (*send)(struct netdev *netdev, struct dpif_packet *buffer,
int (*send)(struct netdev *netdev, struct dpif_packet **buffers, int cnt,
bool may_steal);
/* Registers with the poll loop to wake up from the next call to

View File

@@ -650,10 +650,14 @@ netdev_rxq_drain(struct netdev_rxq *rx)
: 0);
}
/* Sends 'buffer' on 'netdev'. Returns 0 if successful, otherwise a positive
* errno value. Returns EAGAIN without blocking if the packet cannot be queued
* immediately. Returns EMSGSIZE if a partial packet was transmitted or if
* the packet is too big or too small to transmit on the device.
/* Sends 'buffers' on 'netdev'. Returns 0 if successful (for every packet),
* otherwise a positive errno value. Returns EAGAIN without blocking if
* at least one the packets cannot be queued immediately. Returns EMSGSIZE
* if a partial packet was transmitted or if a packet is too big or too small
* to transmit on the device.
*
* If the function returns a non-zero value, some of the packets might have
* been sent anyway.
*
* To retain ownership of 'buffer' caller can set may_steal to false.
*
@@ -663,12 +667,13 @@ netdev_rxq_drain(struct netdev_rxq *rx)
* Some network devices may not implement support for this function. In such
* cases this function will always return EOPNOTSUPP. */
int
netdev_send(struct netdev *netdev, struct dpif_packet *buffer, bool may_steal)
netdev_send(struct netdev *netdev, struct dpif_packet **buffers, int cnt,
bool may_steal)
{
int error;
error = (netdev->netdev_class->send
? netdev->netdev_class->send(netdev, buffer, may_steal)
? netdev->netdev_class->send(netdev, buffers, cnt, may_steal)
: EOPNOTSUPP);
if (!error) {
COVERAGE_INC(netdev_sent);

View File

@@ -173,7 +173,8 @@ void netdev_rxq_wait(struct netdev_rxq *);
int netdev_rxq_drain(struct netdev_rxq *);
/* Packet transmission. */
int netdev_send(struct netdev *, struct dpif_packet *, bool may_steal);
int netdev_send(struct netdev *, struct dpif_packet **, int cnt,
bool may_steal);
void netdev_send_wait(struct netdev *);
/* Hardware address. */