2
0
mirror of https://github.com/openvswitch/ovs synced 2025-09-04 00:05:15 +00:00

netdev-dpdk: add dpdk rings to netdev-dpdk

Shared memory ring patch

This patch enables the client dpdk rings within the netdev-dpdk.  It adds
a new dpdk device called dpdkr (other naming suggestions?).  This allows
for the use of shared memory to communicate with other dpdk applications,
on the host or within a virtual machine.  Instructions for use are in
INSTALL.DPDK.

This has been tested on Intel multi-core platforms and with the client
application within the host.

Signed-off-by: Gerald Rogers <gerald.rogers@intel.com>
Signed-off-by: Maryam Tahhan <maryam.tahhan@intel.com>
Acked-by: Pravin B Shelar <pshelar@nicira.com>
This commit is contained in:
maryam.tahhan
2014-07-11 13:37:11 +01:00
committed by Pravin B Shelar
parent 93295354df
commit 95fb793ae7
7 changed files with 538 additions and 92 deletions

View File

@@ -89,6 +89,7 @@ Luca Giraudo lgiraudo@nicira.com
Luigi Rizzo rizzo@iet.unipi.it
Mark Hamilton mhamilton@nicira.com
Martin Casado casado@nicira.com
Maryam Tahhan maryam.tahhan@intel.com
Mehak Mahajan mmahajan@nicira.com
Murphy McCauley murphy.mccauley@gmail.com
Natasha Gude natasha@nicira.com

View File

@@ -190,6 +190,45 @@ The core 23 is left idle, which allows core 7 to run at full rate.
Future changes may change the need for cpu core affinitization.
DPDK Rings :
------------
Following the steps above to create a bridge, you can now add dpdk rings
as a port to the vswitch. OVS will expect the DPDK ring device name to
start with dpdkr and end with a portid.
ovs-vsctl add-port br0 dpdkr0 -- set Interface dpdkr0 type=dpdkr
DPDK rings client test application
Included in the test directory is a sample DPDK application for testing
the rings. This is from the base dpdk directory and modified to work
with the ring naming used within ovs.
location tests/ovs_client
To run the client :
ovsclient -c 1 -n 4 --proc-type=secondary -- -n "port id you gave dpdkr"
In the case of the dpdkr example above the "port id you gave dpdkr" is 0.
It is essential to have --proc-type=secondary
The application simply receives an mbuf on the receive queue of the
ethernet ring and then places that same mbuf on the transmit ring of
the ethernet ring. It is a trivial loopback application.
In addition to executing the client in the host, you can execute it within
a guest VM. To do so you will need a patched qemu. You can download the
patch and getting started guide at :
https://01.org/packet-processing/downloads
A general rule of thumb for better performance is that the client
application should not be assigned the same dpdk core mask "-c" as
the vswitchd.
Restrictions:
-------------
@@ -202,6 +241,11 @@ Restrictions:
device queues configured.
- Work with 1500 MTU, needs few changes in DPDK lib to fix this issue.
- Currently DPDK port does not make use any offload functionality.
ivshmem
- The shared memory is currently restricted to the use of a 1GB
huge pages.
- All huge pages are shared amongst the host, clients, virtual
machines etc.
Bug Reporting:
--------------

View File

@@ -153,6 +153,22 @@ struct dpdk_tx_queue {
struct rte_mbuf *burst_pkts[MAX_TX_QUEUE_LEN];
};
/* dpdk has no way to remove dpdk ring ethernet devices
so we have to keep them around once they've been created
*/
static struct list dpdk_ring_list OVS_GUARDED_BY(dpdk_mutex)
= LIST_INITIALIZER(&dpdk_ring_list);
struct dpdk_ring {
/* For the client rings */
struct rte_ring *cring_tx;
struct rte_ring *cring_rx;
int user_port_id; /* User given port no, parsed from port name */
int eth_port_id; /* ethernet device port id */
struct list list_node OVS_GUARDED_BY(dpdk_mutex);
};
struct netdev_dpdk {
struct netdev up;
int port_id;
@@ -276,7 +292,10 @@ dpdk_mp_get(int socket_id, int mtu) OVS_REQUIRES(dpdk_mutex)
dmp->mtu = mtu;
dmp->refcount = 1;
snprintf(mp_name, RTE_MEMPOOL_NAMESIZE, "ovs_mp_%d", dmp->mtu);
if (snprintf(mp_name, RTE_MEMPOOL_NAMESIZE, "ovs_mp_%d", dmp->mtu) < 0) {
return NULL;
}
dmp->mp = rte_mempool_create(mp_name, NB_MBUF, MBUF_SIZE(mtu),
MP_CACHE_SZ,
sizeof(struct rte_pktmbuf_pool_private),
@@ -365,13 +384,13 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev) OVS_REQUIRES(dpdk_mutex)
int i;
if (dev->port_id < 0 || dev->port_id >= rte_eth_dev_count()) {
return -ENODEV;
return ENODEV;
}
diag = rte_eth_dev_configure(dev->port_id, NR_QUEUE, NR_QUEUE, &port_conf);
if (diag) {
VLOG_ERR("eth dev config error %d",diag);
return diag;
return -diag;
}
for (i = 0; i < NR_QUEUE; i++) {
@@ -379,7 +398,7 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev) OVS_REQUIRES(dpdk_mutex)
dev->socket_id, &tx_conf);
if (diag) {
VLOG_ERR("eth dev tx queue setup error %d",diag);
return diag;
return -diag;
}
}
@@ -389,14 +408,14 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev) OVS_REQUIRES(dpdk_mutex)
&rx_conf, dev->dpdk_mp->mp);
if (diag) {
VLOG_ERR("eth dev rx queue setup error %d",diag);
return diag;
return -diag;
}
}
diag = rte_eth_dev_start(dev->port_id);
if (diag) {
VLOG_ERR("eth dev start error %d",diag);
return diag;
return -diag;
}
rte_eth_promiscuous_enable(dev->port_id);
@@ -431,61 +450,81 @@ netdev_dpdk_alloc(void)
}
static int
netdev_dpdk_construct(struct netdev *netdev_)
netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no) OVS_REQUIRES(dpdk_mutex)
{
struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
unsigned int port_no;
char *cport;
int err;
int err = 0;
int i;
if (rte_eal_init_ret) {
return rte_eal_init_ret;
}
ovs_mutex_init(&netdev->mutex);
ovs_mutex_lock(&dpdk_mutex);
cport = netdev_->name + 4; /* Names always start with "dpdk" */
if (strncmp(netdev_->name, "dpdk", 4)) {
err = ENODEV;
goto unlock_dpdk;
}
port_no = strtol(cport, 0, 0); /* string must be null terminated */
ovs_mutex_lock(&netdev->mutex);
for (i = 0; i < NR_QUEUE; i++) {
rte_spinlock_init(&netdev->tx_q[i].tx_lock);
}
ovs_mutex_init(&netdev->mutex);
netdev->port_id = port_no;
ovs_mutex_lock(&netdev->mutex);
netdev->flags = 0;
netdev->mtu = ETHER_MTU;
netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu);
/* TODO: need to discover device node at run time. */
netdev->socket_id = SOCKET0;
netdev->port_id = port_no;
netdev->dpdk_mp = dpdk_mp_get(netdev->socket_id, netdev->mtu);
if (!netdev->dpdk_mp) {
err = ENOMEM;
goto unlock_dev;
goto unlock;
}
err = dpdk_eth_dev_init(netdev);
if (err) {
goto unlock_dev;
goto unlock;
}
netdev_->n_rxq = NR_QUEUE;
list_push_back(&dpdk_list, &netdev->list_node);
unlock_dev:
unlock:
ovs_mutex_unlock(&netdev->mutex);
unlock_dpdk:
return err;
}
static int
dpdk_dev_parse_name(const char dev_name[], const char prefix[],
unsigned int *port_no)
{
const char *cport;
if (strncmp(dev_name, prefix, strlen(prefix))) {
return ENODEV;
}
cport = dev_name + strlen(prefix);
*port_no = strtol(cport, 0, 0); /* string must be null terminated */
return 0;
}
static int
netdev_dpdk_construct(struct netdev *netdev)
{
unsigned int port_no;
int err;
if (rte_eal_init_ret) {
return rte_eal_init_ret;
}
/* Names always start with "dpdk" */
err = dpdk_dev_parse_name(netdev->name, "dpdk", &port_no);
if (err) {
return err;
}
ovs_mutex_lock(&dpdk_mutex);
err = netdev_dpdk_init(netdev, port_no);
ovs_mutex_unlock(&dpdk_mutex);
return err;
}
@@ -667,6 +706,7 @@ dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt)
for (i = 0; i < cnt; i++) {
int size = ofpbuf_size(&pkts[i]->ofpbuf);
if (OVS_UNLIKELY(size > dev->max_packet_len)) {
VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
(int)size , dev->max_packet_len);
@@ -983,8 +1023,7 @@ netdev_dpdk_set_miimon(struct netdev *netdev_ OVS_UNUSED,
static int
netdev_dpdk_update_flags__(struct netdev_dpdk *dev,
enum netdev_flags off, enum netdev_flags on,
enum netdev_flags *old_flagsp)
OVS_REQUIRES(dev->mutex)
enum netdev_flags *old_flagsp) OVS_REQUIRES(dev->mutex)
{
int err;
@@ -1003,7 +1042,7 @@ netdev_dpdk_update_flags__(struct netdev_dpdk *dev,
if (dev->flags & NETDEV_UP) {
err = rte_eth_dev_start(dev->port_id);
if (err)
return err;
return -err;
}
if (dev->flags & NETDEV_PROMISC) {
@@ -1047,6 +1086,7 @@ netdev_dpdk_get_status(const struct netdev *netdev_, struct smap *args)
smap_add_format(args, "driver_name", "%s", dev_info.driver_name);
smap_add_format(args, "port_no", "%d", dev->port_id);
smap_add_format(args, "numa_id", "%d", rte_eth_dev_socket_id(dev->port_id));
smap_add_format(args, "driver_name", "%s", dev_info.driver_name);
smap_add_format(args, "min_rx_bufsize", "%u", dev_info.min_rx_bufsize);
@@ -1133,13 +1173,13 @@ dpdk_class_init(void)
result = rte_pmd_init_all();
if (result) {
VLOG_ERR("Cannot init PMD");
return result;
return -result;
}
result = rte_eal_pci_probe();
if (result) {
VLOG_ERR("Cannot probe PCI");
return result;
return -result;
}
if (rte_eth_dev_count() < 1) {
@@ -1159,68 +1199,176 @@ dpdk_class_init(void)
return 0;
}
static struct netdev_class netdev_dpdk_class = {
"dpdk",
dpdk_class_init, /* init */
NULL, /* netdev_dpdk_run */
NULL, /* netdev_dpdk_wait */
/* Client Rings */
netdev_dpdk_alloc,
netdev_dpdk_construct,
netdev_dpdk_destruct,
netdev_dpdk_dealloc,
netdev_dpdk_get_config,
NULL, /* netdev_dpdk_set_config */
NULL, /* get_tunnel_config */
static int
dpdk_ring_class_init(void)
{
VLOG_INFO("Initialized dpdk client handlers:\n");
return 0;
}
netdev_dpdk_send, /* send */
NULL, /* send_wait */
static int
dpdk_ring_create(const char dev_name[], unsigned int port_no,
unsigned int *eth_port_id)
{
struct dpdk_ring *ivshmem;
char ring_name[10];
int err;
netdev_dpdk_set_etheraddr,
netdev_dpdk_get_etheraddr,
netdev_dpdk_get_mtu,
netdev_dpdk_set_mtu,
netdev_dpdk_get_ifindex,
netdev_dpdk_get_carrier,
netdev_dpdk_get_carrier_resets,
netdev_dpdk_set_miimon,
netdev_dpdk_get_stats,
netdev_dpdk_set_stats,
netdev_dpdk_get_features,
NULL, /* set_advertisements */
ivshmem = dpdk_rte_mzalloc(sizeof *ivshmem);
if (ivshmem == NULL) {
return ENOMEM;
}
NULL, /* set_policing */
NULL, /* get_qos_types */
NULL, /* get_qos_capabilities */
NULL, /* get_qos */
NULL, /* set_qos */
NULL, /* get_queue */
NULL, /* set_queue */
NULL, /* delete_queue */
NULL, /* get_queue_stats */
NULL, /* queue_dump_start */
NULL, /* queue_dump_next */
NULL, /* queue_dump_done */
NULL, /* dump_queue_stats */
err = snprintf(ring_name, 10, "%s_tx", dev_name);
if (err < 0) {
return -err;
}
NULL, /* get_in4 */
NULL, /* set_in4 */
NULL, /* get_in6 */
NULL, /* add_router */
NULL, /* get_next_hop */
netdev_dpdk_get_status,
NULL, /* arp_lookup */
ivshmem->cring_tx = rte_ring_create(ring_name, MAX_RX_QUEUE_LEN, SOCKET0, 0);
if (ivshmem->cring_tx == NULL) {
rte_free(ivshmem);
return ENOMEM;
}
netdev_dpdk_update_flags,
err = snprintf(ring_name, 10, "%s_rx", dev_name);
if (err < 0) {
return -err;
}
netdev_dpdk_rxq_alloc,
netdev_dpdk_rxq_construct,
netdev_dpdk_rxq_destruct,
netdev_dpdk_rxq_dealloc,
netdev_dpdk_rxq_recv,
NULL, /* rxq_wait */
NULL, /* rxq_drain */
};
ivshmem->cring_rx = rte_ring_create(ring_name, MAX_RX_QUEUE_LEN, SOCKET0, 0);
if (ivshmem->cring_rx == NULL) {
rte_free(ivshmem);
return ENOMEM;
}
err = rte_eth_from_rings(&ivshmem->cring_rx, 1, &ivshmem->cring_tx, 1, SOCKET0);
if (err < 0) {
rte_free(ivshmem);
return ENODEV;
}
ivshmem->user_port_id = port_no;
ivshmem->eth_port_id = rte_eth_dev_count() - 1;
list_push_back(&dpdk_ring_list, &ivshmem->list_node);
*eth_port_id = ivshmem->eth_port_id;
return 0;
}
static int
dpdk_ring_open(const char dev_name[], unsigned int *eth_port_id) OVS_REQUIRES(dpdk_mutex)
{
struct dpdk_ring *ivshmem;
unsigned int port_no;
int err = 0;
/* Names always start with "dpdkr" */
err = dpdk_dev_parse_name(dev_name, "dpdkr", &port_no);
if (err) {
return err;
}
/* look through our list to find the device */
LIST_FOR_EACH (ivshmem, list_node, &dpdk_ring_list) {
if (ivshmem->user_port_id == port_no) {
VLOG_INFO("Found dpdk ring device %s:\n", dev_name);
*eth_port_id = ivshmem->eth_port_id; /* really all that is needed */
return 0;
}
}
/* Need to create the device rings */
return dpdk_ring_create(dev_name, port_no, eth_port_id);
}
static int
netdev_dpdk_ring_construct(struct netdev *netdev)
{
unsigned int port_no = 0;
int err = 0;
if (rte_eal_init_ret) {
return rte_eal_init_ret;
}
ovs_mutex_lock(&dpdk_mutex);
err = dpdk_ring_open(netdev->name, &port_no);
if (err) {
goto unlock_dpdk;
}
err = netdev_dpdk_init(netdev, port_no);
unlock_dpdk:
ovs_mutex_unlock(&dpdk_mutex);
return err;
}
#define NETDEV_DPDK_CLASS(NAME, INIT, CONSTRUCT) \
{ \
NAME, \
INIT, /* init */ \
NULL, /* netdev_dpdk_run */ \
NULL, /* netdev_dpdk_wait */ \
\
netdev_dpdk_alloc, \
CONSTRUCT, \
netdev_dpdk_destruct, \
netdev_dpdk_dealloc, \
netdev_dpdk_get_config, \
NULL, /* netdev_dpdk_set_config */ \
NULL, /* get_tunnel_config */ \
\
netdev_dpdk_send, /* send */ \
NULL, /* send_wait */ \
\
netdev_dpdk_set_etheraddr, \
netdev_dpdk_get_etheraddr, \
netdev_dpdk_get_mtu, \
netdev_dpdk_set_mtu, \
netdev_dpdk_get_ifindex, \
netdev_dpdk_get_carrier, \
netdev_dpdk_get_carrier_resets, \
netdev_dpdk_set_miimon, \
netdev_dpdk_get_stats, \
netdev_dpdk_set_stats, \
netdev_dpdk_get_features, \
NULL, /* set_advertisements */ \
\
NULL, /* set_policing */ \
NULL, /* get_qos_types */ \
NULL, /* get_qos_capabilities */ \
NULL, /* get_qos */ \
NULL, /* set_qos */ \
NULL, /* get_queue */ \
NULL, /* set_queue */ \
NULL, /* delete_queue */ \
NULL, /* get_queue_stats */ \
NULL, /* queue_dump_start */ \
NULL, /* queue_dump_next */ \
NULL, /* queue_dump_done */ \
NULL, /* dump_queue_stats */ \
\
NULL, /* get_in4 */ \
NULL, /* set_in4 */ \
NULL, /* get_in6 */ \
NULL, /* add_router */ \
NULL, /* get_next_hop */ \
netdev_dpdk_get_status, \
NULL, /* arp_lookup */ \
\
netdev_dpdk_update_flags, \
\
netdev_dpdk_rxq_alloc, \
netdev_dpdk_rxq_construct, \
netdev_dpdk_rxq_destruct, \
netdev_dpdk_rxq_dealloc, \
netdev_dpdk_rxq_recv, \
NULL, /* rx_wait */ \
NULL, /* rxq_drain */ \
}
int
dpdk_init(int argc, char **argv)
@@ -1252,10 +1400,28 @@ dpdk_init(int argc, char **argv)
return result + 1;
}
const struct netdev_class dpdk_class =
NETDEV_DPDK_CLASS(
"dpdk",
dpdk_class_init,
netdev_dpdk_construct);
const struct netdev_class dpdk_ring_class =
NETDEV_DPDK_CLASS(
"dpdkr",
dpdk_ring_class_init,
netdev_dpdk_ring_construct);
void
netdev_dpdk_register(void)
{
netdev_register_provider(&netdev_dpdk_class);
static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
if (ovsthread_once_start(&once)) {
netdev_register_provider(&dpdk_class);
netdev_register_provider(&dpdk_ring_class);
ovsthread_once_done(&once);
}
}
int

View File

@@ -12,6 +12,7 @@ struct dpif_packet;
#include <rte_eal.h>
#include <rte_debug.h>
#include <rte_ethdev.h>
#include <rte_eth_ring.h>
#include <rte_errno.h>
#include <rte_memzone.h>
#include <rte_memcpy.h>

View File

@@ -98,7 +98,8 @@ netdev_n_rxq(const struct netdev *netdev)
bool
netdev_is_pmd(const struct netdev *netdev)
{
return !strcmp(netdev->netdev_class->type, "dpdk");
return (!strcmp(netdev->netdev_class->type, "dpdk") ||
!strcmp(netdev->netdev_class->type, "dpdkr"));
}
static void

View File

@@ -206,6 +206,13 @@ tests/idltest.ovsidl: $(IDLTEST_IDL_FILES)
tests/idltest.c: tests/idltest.h
if DPDK_NETDEV
noinst_PROGRAMS += tests/ovsclient
tests_ovsclient_SOURCES = \
tests/ovs_client/ovs_client.c
tests_ovsclient_LDADD = lib/libopenvswitch.la $(LIBS)
endif
noinst_PROGRAMS += tests/ovstest
tests_ovstest_SOURCES = \
tests/ovstest.c \

View File

@@ -0,0 +1,226 @@
/*
* BSD LICENSE
*
* Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Intel Corporation nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <getopt.h>
#include <config.h>
#include <rte_config.h>
#include <rte_mbuf.h>
#include <rte_ether.h>
#include <rte_string_fns.h>
#include <rte_ip.h>
#include <rte_byteorder.h>
/* Number of packets to attempt to read from queue. */
#define PKT_READ_SIZE ((uint16_t)32)
/* Define common names for structures shared between ovs_dpdk and client. */
#define MP_CLIENT_RXQ_NAME "dpdkr%u_tx"
#define MP_CLIENT_TXQ_NAME "dpdkr%u_rx"
#define RTE_LOGTYPE_APP RTE_LOGTYPE_USER1
#define BASE_10 10
/* Our client id number - tells us which rx queue to read, and tx
* queue to write to.
*/
static uint8_t client_id = 0;
int no_pkt;
int pkt;
/*
* Given the rx queue name template above, get the queue name.
*/
static inline const char *
get_rx_queue_name(unsigned id)
{
/* Buffer for return value. Size calculated by %u being replaced
* by maximum 3 digits (plus an extra byte for safety).
*/
static char buffer[sizeof(MP_CLIENT_RXQ_NAME) + 2];
rte_snprintf(buffer, sizeof(buffer) - 1, MP_CLIENT_RXQ_NAME, id);
return buffer;
}
/*
* Given the tx queue name template above, get the queue name.
*/
static inline const char *
get_tx_queue_name(unsigned id)
{
/* Buffer for return value. Size calculated by %u being replaced
* by maximum 3 digits (plus an extra byte for safety).
*/
static char buffer[sizeof(MP_CLIENT_TXQ_NAME) + 2];
rte_snprintf(buffer, sizeof(buffer) - 1, MP_CLIENT_TXQ_NAME, id);
return buffer;
}
/*
* Print a usage message.
*/
static void
usage(const char *progname)
{
printf("\nUsage: %s [EAL args] -- -n <client_id>\n", progname);
}
/*
* Convert the client id number from a string to an int.
*/
static int
parse_client_num(const char *client)
{
char *end = NULL;
unsigned long temp = 0;
if (client == NULL || *client == '\0') {
return -1;
}
temp = strtoul(client, &end, BASE_10);
/* If valid string argument is provided, terminating '/0' character
* is stored in 'end'. */
if (end == NULL || *end != '\0') {
return -1;
}
client_id = (uint8_t)temp;
return 0;
}
/*
* Parse the application arguments to the client app.
*/
static int
parse_app_args(int argc, char *argv[])
{
int option_index = 0, opt = 0;
char **argvopt = argv;
const char *progname = NULL;
static struct option lgopts[] = {
{NULL, 0, 0, 0 }
};
progname = argv[0];
while ((opt = getopt_long(argc, argvopt, "n:", lgopts,
&option_index)) != EOF) {
switch (opt) {
case 'n':
if (parse_client_num(optarg) != 0) {
usage(progname);
return -1;
}
break;
default:
usage(progname);
return -1;
}
}
return 0;
}
/*
* Application main function - loops through
* receiving and processing packets. Never returns
*/
int
main(int argc, char *argv[])
{
struct rte_ring *rx_ring = NULL;
struct rte_ring *tx_ring = NULL;
int retval = 0;
void *pkts[PKT_READ_SIZE];
int rslt = 0;
if ((retval = rte_eal_init(argc, argv)) < 0) {
return -1;
}
argc -= retval;
argv += retval;
if (parse_app_args(argc, argv) < 0) {
rte_exit(EXIT_FAILURE, "Invalid command-line arguments\n");
}
rx_ring = rte_ring_lookup(get_rx_queue_name(client_id));
if (rx_ring == NULL) {
rte_exit(EXIT_FAILURE,
"Cannot get RX ring - is server process running?\n");
}
tx_ring = rte_ring_lookup(get_tx_queue_name(client_id));
if (tx_ring == NULL) {
rte_exit(EXIT_FAILURE,
"Cannot get TX ring - is server process running?\n");
}
RTE_LOG(INFO, APP, "Finished Process Init.\n");
printf("\nClient process %d handling packets\n", client_id);
printf("[Press Ctrl-C to quit ...]\n");
for (;;) {
unsigned rx_pkts = PKT_READ_SIZE;
/* Try dequeuing max possible packets first, if that fails, get the
* most we can. Loop body should only execute once, maximum.
*/
while (unlikely(rte_ring_dequeue_bulk(rx_ring, pkts, rx_pkts) != 0) &&
rx_pkts > 0) {
rx_pkts = (uint16_t)RTE_MIN(rte_ring_count(rx_ring), PKT_READ_SIZE);
}
if (rx_pkts > 0) {
pkt++;
/* blocking enqueue */
do {
rslt = rte_ring_enqueue_bulk(tx_ring, pkts, rx_pkts);
} while (rslt == -ENOBUFS);
} else {
no_pkt++;
}
if (!(pkt % 100000)) {
printf("pkt %d %d\n", pkt, no_pkt);
pkt = no_pkt = 0;
}
}
}