2
0
mirror of https://github.com/openvswitch/ovs synced 2025-08-30 22:05:19 +00:00

netdev: netdev_send accepts multiple packets

The netdev_send function has been modified to accept multiple packets, to
allow netdev providers to amortize locking and queuing costs.
This is especially true for netdev-dpdk.

Later commits exploit the new API.

Signed-off-by: Daniele Di Proietto <ddiproietto@vmware.com>
Acked-by: Pravin B Shelar <pshelar@nicira.com>
This commit is contained in:
Daniele Di Proietto
2014-06-23 11:43:58 -07:00
committed by Pravin B Shelar
parent 910885540a
commit f4fd623c4c
8 changed files with 242 additions and 156 deletions

View File

@@ -206,8 +206,8 @@ dpdk_rte_mzalloc(size_t sz)
void
free_dpdk_buf(struct dpif_packet *p)
{
struct ofpbuf *b = &p->ofpbuf;
struct rte_mbuf *pkt = (struct rte_mbuf *) b->dpdk_buf;
struct ofpbuf *ofp = &p->ofpbuf;
struct rte_mbuf *pkt = (struct rte_mbuf *) ofp->dpdk_buf;
rte_mempool_put(pkt->pool, pkt);
}
@@ -612,104 +612,151 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct dpif_packet **packets,
}
inline static void
dpdk_queue_pkt(struct netdev_dpdk *dev, int qid,
struct rte_mbuf *pkt)
dpdk_queue_pkts(struct netdev_dpdk *dev, int qid,
struct rte_mbuf **pkts, int cnt)
{
struct dpdk_tx_queue *txq = &dev->tx_q[qid];
uint64_t diff_tsc;
uint64_t cur_tsc;
uint32_t nb_tx;
rte_spinlock_lock(&txq->tx_lock);
txq->burst_pkts[txq->count++] = pkt;
if (txq->count == MAX_TX_QUEUE_LEN) {
goto flush;
}
cur_tsc = rte_get_timer_cycles();
if (txq->count == 1) {
txq->tsc = cur_tsc;
}
diff_tsc = cur_tsc - txq->tsc;
if (diff_tsc >= DRAIN_TSC) {
goto flush;
}
rte_spinlock_unlock(&txq->tx_lock);
return;
int i = 0;
flush:
nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count);
if (nb_tx != txq->count) {
/* free buffers if we couldn't transmit packets */
rte_mempool_put_bulk(dev->dpdk_mp->mp,
(void **) &txq->burst_pkts[nb_tx],
(txq->count - nb_tx));
rte_spinlock_lock(&txq->tx_lock);
while (i < cnt) {
int freeslots = MAX_TX_QUEUE_LEN - txq->count;
int tocopy = MIN(freeslots, cnt-i);
memcpy(&txq->burst_pkts[txq->count], &pkts[i],
tocopy * sizeof (struct rte_mbuf *));
txq->count += tocopy;
i += tocopy;
if (txq->count == MAX_TX_QUEUE_LEN) {
goto flush;
}
cur_tsc = rte_get_timer_cycles();
if (txq->count == 1) {
txq->tsc = cur_tsc;
}
diff_tsc = cur_tsc - txq->tsc;
if (diff_tsc >= DRAIN_TSC) {
goto flush;
}
continue;
flush:
nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts,
txq->count);
if (nb_tx != txq->count) {
/* free buffers if we couldn't transmit packets */
rte_mempool_put_bulk(dev->dpdk_mp->mp,
(void **) &txq->burst_pkts[nb_tx],
(txq->count - nb_tx));
}
txq->count = 0;
}
txq->count = 0;
rte_spinlock_unlock(&txq->tx_lock);
}
/* Tx function. Transmit packets indefinitely */
static void
dpdk_do_tx_copy(struct netdev *netdev, char *buf, int size)
dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt)
{
struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
struct rte_mbuf *pkt;
struct rte_mbuf *mbufs[cnt];
int i, newcnt = 0;
pkt = rte_pktmbuf_alloc(dev->dpdk_mp->mp);
if (!pkt) {
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_dropped++;
ovs_mutex_unlock(&dev->mutex);
return;
for (i = 0; i < cnt; i++) {
int size = ofpbuf_size(&pkts[i]->ofpbuf);
if (size > dev->max_packet_len) {
VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
(int)size , dev->max_packet_len);
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_dropped++;
ovs_mutex_unlock(&dev->mutex);
continue;
}
mbufs[newcnt] = rte_pktmbuf_alloc(dev->dpdk_mp->mp);
if (!mbufs[newcnt]) {
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_dropped++;
ovs_mutex_unlock(&dev->mutex);
return;
}
/* We have to do a copy for now */
memcpy(mbufs[newcnt]->pkt.data, ofpbuf_data(&pkts[i]->ofpbuf), size);
rte_pktmbuf_data_len(mbufs[newcnt]) = size;
rte_pktmbuf_pkt_len(mbufs[newcnt]) = size;
newcnt++;
}
/* We have to do a copy for now */
memcpy(pkt->pkt.data, buf, size);
rte_pktmbuf_data_len(pkt) = size;
rte_pktmbuf_pkt_len(pkt) = size;
dpdk_queue_pkt(dev, NON_PMD_THREAD_TX_QUEUE, pkt);
dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, mbufs, newcnt);
dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE);
}
static int
netdev_dpdk_send(struct netdev *netdev,
struct dpif_packet *packet, bool may_steal)
netdev_dpdk_send(struct netdev *netdev, struct dpif_packet **pkts, int cnt,
bool may_steal)
{
struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
struct ofpbuf *ofpbuf = &packet->ofpbuf;
int ret;
int i;
if (ofpbuf_size(ofpbuf) > dev->max_packet_len) {
VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
(int)ofpbuf_size(ofpbuf) , dev->max_packet_len);
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_dropped++;
ovs_mutex_unlock(&dev->mutex);
ret = E2BIG;
goto out;
}
if (!may_steal || ofpbuf->source != OFPBUF_DPDK) {
dpdk_do_tx_copy(netdev, (char *) ofpbuf_data(ofpbuf), ofpbuf_size(ofpbuf));
if (!may_steal || pkts[0]->ofpbuf.source != OFPBUF_DPDK) {
dpdk_do_tx_copy(netdev, pkts, cnt);
if (may_steal) {
dpif_packet_delete(packet);
for (i = 0; i < cnt; i++) {
dpif_packet_delete(pkts[i]);
}
}
} else {
int qid;
int next_tx_idx = 0;
int dropped = 0;
qid = rte_lcore_id() % NR_QUEUE;
dpdk_queue_pkt(dev, qid, (struct rte_mbuf *)ofpbuf);
for (i = 0; i < cnt; i++) {
int size = ofpbuf_size(&pkts[i]->ofpbuf);
if (OVS_UNLIKELY(size > dev->max_packet_len)) {
if (next_tx_idx != i) {
dpdk_queue_pkts(dev, qid,
(struct rte_mbuf **)&pkts[next_tx_idx],
i-next_tx_idx);
VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
(int)size , dev->max_packet_len);
dpif_packet_delete(pkts[i]);
dropped++;
}
next_tx_idx = i + 1;
}
}
if (next_tx_idx != cnt) {
dpdk_queue_pkts(dev, qid,
(struct rte_mbuf **)&pkts[next_tx_idx],
cnt-next_tx_idx);
}
if (OVS_UNLIKELY(dropped)) {
ovs_mutex_lock(&dev->mutex);
dev->stats.tx_dropped += dropped;
ovs_mutex_unlock(&dev->mutex);
}
}
ret = 0;
out:
return ret;
}