代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/gazelle 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From 200ee63e092824edf12c7cc3172c61ba04a57d56 Mon Sep 17 00:00:00 2001
From: jinag12 <jiangheng14@huawei.com>
Date: Sat, 11 Mar 2023 15:59:51 +0000
Subject: [PATCH] supprot multi process
---
src/common/gazelle_opt.h | 4 +
src/lstack/api/lstack_epoll.c | 16 +-
src/lstack/api/lstack_wrap.c | 3 +-
src/lstack/core/lstack_cfg.c | 201 +++++++-
src/lstack/core/lstack_dpdk.c | 192 ++++---
src/lstack/core/lstack_init.c | 2 +
src/lstack/core/lstack_protocol_stack.c | 225 +++++++--
src/lstack/include/lstack_cfg.h | 17 +-
src/lstack/include/lstack_dpdk.h | 3 +-
src/lstack/include/lstack_ethdev.h | 23 +-
src/lstack/include/lstack_protocol_stack.h | 10 +
src/lstack/include/lstack_vdev.h | 7 +
src/lstack/lstack.conf | 4 +
src/lstack/netif/lstack_ethdev.c | 555 ++++++++++++++++++++-
src/lstack/netif/lstack_vdev.c | 49 +-
15 files changed, 1143 insertions(+), 168 deletions(-)
diff --git a/src/common/gazelle_opt.h b/src/common/gazelle_opt.h
index 745fdd8..e278107 100644
--- a/src/common/gazelle_opt.h
+++ b/src/common/gazelle_opt.h
@@ -90,4 +90,8 @@
#define SEND_TIME_WAIT_NS 20000
#define SECOND_NSECOND 1000000000
+#define LSTACK_SEND_THREAD_NAME "lstack_send"
+#define LSTACK_RECV_THREAD_NAME "lstack_recv"
+#define LSTACK_THREAD_NAME "gazellelstack"
+
#endif /* _GAZELLE_OPT_H_ */
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
index da29590..4a10b09 100644
--- a/src/lstack/api/lstack_epoll.c
+++ b/src/lstack/api/lstack_epoll.c
@@ -74,8 +74,8 @@ void add_sock_event(struct lwip_sock *sock, uint32_t event)
}
struct protocol_stack *stack = sock->stack;
- if (list_is_null(&wakeup->wakeup_list[stack->queue_id])) {
- list_add_node(&stack->wakeup_list, &wakeup->wakeup_list[stack->queue_id]);
+ if (list_is_null(&wakeup->wakeup_list[stack->stack_idx])) {
+ list_add_node(&stack->wakeup_list, &wakeup->wakeup_list[stack->stack_idx]);
}
}
@@ -95,7 +95,7 @@ void wakeup_stack_epoll(struct protocol_stack *stack, bool wakeup_thread_enable)
temp = nod;
}
- struct wakeup_poll *wakeup = container_of((node - stack->queue_id), struct wakeup_poll, wakeup_list);
+ struct wakeup_poll *wakeup = container_of((node - stack->stack_idx), struct wakeup_poll, wakeup_list);
if (!wakeup_thread_enable) {
if (__atomic_load_n(&wakeup->in_wait, __ATOMIC_ACQUIRE)) {
@@ -109,7 +109,7 @@ void wakeup_stack_epoll(struct protocol_stack *stack, bool wakeup_thread_enable)
stack->stats.wakeup_events++;
}
- list_del_node_null(&wakeup->wakeup_list[stack->queue_id]);
+ list_del_node_null(&wakeup->wakeup_list[stack->stack_idx]);
}
}
@@ -291,7 +291,7 @@ static uint16_t find_max_cnt_stack(int32_t *stack_count, uint16_t stack_num, str
/* all stack same, don't change */
if (all_same_cnt && last_stack) {
- return last_stack->queue_id;
+ return last_stack->stack_idx;
}
/* first bind and all stack same. choice tick as queue_id, avoid all bind to statck_0. */
@@ -343,7 +343,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
switch (op) {
case EPOLL_CTL_ADD:
sock->wakeup = wakeup;
- wakeup->stack_fd_cnt[sock->stack->queue_id]++;
+ wakeup->stack_fd_cnt[sock->stack->stack_idx]++;
/* fall through */
case EPOLL_CTL_MOD:
sock->epoll_events = event->events | EPOLLERR | EPOLLHUP;
@@ -352,7 +352,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
break;
case EPOLL_CTL_DEL:
sock->epoll_events = 0;
- wakeup->stack_fd_cnt[sock->stack->queue_id]--;
+ wakeup->stack_fd_cnt[sock->stack->stack_idx]--;
pthread_spin_lock(&wakeup->event_list_lock);
list_del_node_null(&sock->event_list);
pthread_spin_unlock(&wakeup->event_list_lock);
@@ -652,7 +652,7 @@ static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfd
while (sock && sock->conn) {
sock->epoll_events = fds[i].events | POLLERR;
sock->wakeup = wakeup;
- stack_count[sock->stack->queue_id]++;
+ stack_count[sock->stack->stack_idx]++;
sock = sock->listen_next;
}
}
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
index f438529..561c6e4 100644
--- a/src/lstack/api/lstack_wrap.c
+++ b/src/lstack/api/lstack_wrap.c
@@ -238,8 +238,7 @@ static inline int32_t do_listen(int32_t s, int32_t backlog)
return posix_api->listen_fn(s, backlog);
}
- int32_t ret = get_global_cfg_params()->listen_shadow ? stack_broadcast_listen(s, backlog) :
- stack_single_listen(s, backlog);
+ int32_t ret = stack_broadcast_listen(s, backlog);
if (ret != 0) {
return ret;
}
diff --git a/src/lstack/core/lstack_cfg.c b/src/lstack/core/lstack_cfg.c
index 9195f34..72a3292 100644
--- a/src/lstack/core/lstack_cfg.c
+++ b/src/lstack/core/lstack_cfg.c
@@ -65,6 +65,10 @@ static int32_t parse_tcp_conn_count(void);
static int32_t parse_mbuf_count_per_conn(void);
static int32_t parse_send_ring_size(void);
static int32_t parse_expand_send_ring(void);
+static int32_t parse_num_process(void);
+static int32_t parse_process_numa(void);
+static int32_t parse_process_index(void);
+static int32_t parse_seperate_sendrecv_args(void);
static inline int32_t parse_int(void *arg, char * arg_string, int32_t default_val,
int32_t min_val, int32_t max_val)
@@ -97,6 +101,7 @@ static struct config_vector_t g_config_tbl[] = {
{ "use_ltran", parse_use_ltran },
{ "devices", parse_devices },
{ "dpdk_args", parse_dpdk_args },
+ { "seperate_send_recv", parse_seperate_sendrecv_args },
{ "num_cpus", parse_stack_cpu_number },
{ "num_wakeup", parse_wakeup_cpu_number },
{ "low_power_mode", parse_low_power_mode },
@@ -112,6 +117,9 @@ static struct config_vector_t g_config_tbl[] = {
{ "nic_read_number", parse_nic_read_number },
{ "send_ring_size", parse_send_ring_size },
{ "expand_send_ring", parse_expand_send_ring },
+ { "num_process", parse_num_process },
+ { "process_numa", parse_process_numa },
+ { "process_idx", parse_process_index },
{ NULL, NULL }
};
@@ -277,35 +285,99 @@ static int32_t parse_stack_cpu_number(void)
const config_setting_t *num_cpus = NULL;
const char *args = NULL;
- num_cpus = config_lookup(&g_config, "num_cpus");
- if (num_cpus == NULL) {
- return -EINVAL;
- }
+ if (!g_config_params.seperate_send_recv) {
+ num_cpus = config_lookup(&g_config, "num_cpus");
+ if (num_cpus == NULL) {
+ return -EINVAL;
+ }
- args = config_setting_get_string(num_cpus);
- if (args == NULL) {
- return -EINVAL;
- }
+ args = config_setting_get_string(num_cpus);
+ if (args == NULL) {
+ return -EINVAL;
+ }
- if (!have_corelist_arg(g_config_params.dpdk_argc, g_config_params.dpdk_argv)) {
- int32_t idx = get_param_idx(g_config_params.dpdk_argc, g_config_params.dpdk_argv, OPT_BIND_CORELIST);
- if (idx < 0) {
- g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(OPT_BIND_CORELIST);
- g_config_params.dpdk_argc++;
+ if (!have_corelist_arg(g_config_params.dpdk_argc, g_config_params.dpdk_argv)) {
+ int32_t idx = get_param_idx(g_config_params.dpdk_argc, g_config_params.dpdk_argv, OPT_BIND_CORELIST);
+ if (idx < 0) {
+ g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(OPT_BIND_CORELIST);
+ g_config_params.dpdk_argc++;
- g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(args);
- g_config_params.dpdk_argc++;
+ g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(args);
+ g_config_params.dpdk_argc++;
+ }
}
- }
- char *tmp_arg = strdup(args);
- int32_t cnt = separate_str_to_array(tmp_arg, g_config_params.cpus, CFG_MAX_CPUS);
- free(tmp_arg);
- if (cnt <= 0 || cnt > CFG_MAX_CPUS) {
- return -EINVAL;
- }
+ char *tmp_arg = strdup(args);
+ int32_t cnt = separate_str_to_array(tmp_arg, g_config_params.cpus, CFG_MAX_CPUS);
+ free(tmp_arg);
+ if (cnt <= 0 || cnt > CFG_MAX_CPUS) {
+ return -EINVAL;
+ }
+
+ g_config_params.num_cpu = cnt;
+ g_config_params.num_queue = (uint16_t)cnt;
+ g_config_params.tot_queue_num = g_config_params.num_queue;
+ } else {
+ // send_num_cpus
+ num_cpus = config_lookup(&g_config, "send_num_cpus");
+ if (num_cpus == NULL) {
+ return -EINVAL;
+ }
+
+ args = config_setting_get_string(num_cpus);
+ if (args == NULL) {
+ return -EINVAL;
+ }
+
+ if (!have_corelist_arg(g_config_params.dpdk_argc, g_config_params.dpdk_argv)) {
+ int32_t idx = get_param_idx(g_config_params.dpdk_argc, g_config_params.dpdk_argv, OPT_BIND_CORELIST);
+ if (idx < 0) {
+ g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(OPT_BIND_CORELIST);
+ g_config_params.dpdk_argc++;
+
+ g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(args);
+ g_config_params.dpdk_argc++;
+ }
+ }
+
+ char *tmp_arg_send = strdup(args);
+ int32_t cnt = separate_str_to_array(tmp_arg_send, g_config_params.send_cpus, CFG_MAX_CPUS);
+ free(tmp_arg_send);
+
+ // recv_num_cpus
+ num_cpus = config_lookup(&g_config, "recv_num_cpus");
+ if (num_cpus == NULL) {
+ return -EINVAL;
+ }
+
+ args = config_setting_get_string(num_cpus);
+ if (args == NULL) {
+ return -EINVAL;
+ }
- g_config_params.num_cpu = cnt;
+ if (!have_corelist_arg(g_config_params.dpdk_argc, g_config_params.dpdk_argv)) {
+ int32_t idx = get_param_idx(g_config_params.dpdk_argc, g_config_params.dpdk_argv, OPT_BIND_CORELIST);
+ if (idx < 0) {
+ g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(OPT_BIND_CORELIST);
+ g_config_params.dpdk_argc++;
+
+ g_config_params.dpdk_argv[g_config_params.dpdk_argc] = strdup(args);
+ g_config_params.dpdk_argc++;
+ }
+ }
+
+ char *tmp_arg_recv = strdup(args);
+ cnt = separate_str_to_array(tmp_arg_recv, g_config_params.recv_cpus, CFG_MAX_CPUS);
+ free(tmp_arg_recv);
+
+ if (cnt <= 0 || cnt > CFG_MAX_CPUS / 2) {
+ return -EINVAL;
+ }
+
+ g_config_params.num_cpu = cnt;
+ g_config_params.num_queue = (uint16_t)cnt * 2;
+ g_config_params.tot_queue_num = g_config_params.num_queue;
+ }
return 0;
}
@@ -369,7 +441,12 @@ int32_t init_stack_numa_cpuset(struct protocol_stack *stack)
cpu_set_t stack_cpuset;
CPU_ZERO(&stack_cpuset);
for (int32_t idx = 0; idx < cfg->num_cpu; ++idx) {
- CPU_SET(cfg->cpus[idx], &stack_cpuset);
+ if (!cfg->seperate_send_recv) {
+ CPU_SET(cfg->cpus[idx], &stack_cpuset);
+ }else {
+ CPU_SET(cfg->send_cpus[idx], &stack_cpuset);
+ CPU_SET(cfg->recv_cpus[idx], &stack_cpuset);
+ }
}
for (int32_t idx = 0; idx < cfg->num_wakeup; ++idx) {
CPU_SET(cfg->wakeup[idx], &stack_cpuset);
@@ -643,6 +720,13 @@ static int32_t parse_dpdk_args(void)
goto free_dpdk_args;
}
g_config_params.dpdk_argv[start_index + i] = p;
+
+ const char *primary = "primary";
+ if(strcmp(p, primary) == 0){
+ struct cfg_params *global_params = get_global_cfg_params();
+ global_params->is_primary = 1;
+ }
+
(void)fprintf(stderr, "%s ", g_config_params.dpdk_argv[start_index + i]);
}
(void)fprintf(stderr, "\n");
@@ -877,3 +961,72 @@ static int32_t parse_unix_prefix(void)
return 0;
}
+static int32_t parse_seperate_sendrecv_args(void)
+{
+ return parse_int(&g_config_params.seperate_send_recv, "seperate_send_recv", 0, 0, 1);
+}
+
+static int32_t parse_num_process(void)
+{
+ if (g_config_params.use_ltran) {
+ return 0;
+ }
+
+ const config_setting_t *num_process = NULL;
+
+ num_process = config_lookup(&g_config, "num_process");
+ if (num_process == NULL) {
+ g_config_params.num_process = 1;
+ }else {
+ g_config_params.num_process = (uint8_t)config_setting_get_int(num_process);
+ }
+
+ g_config_params.tot_queue_num = g_config_params.num_queue * g_config_params.num_process;
+
+ return 0;
+}
+
+static int32_t parse_process_numa(void)
+{
+ const config_setting_t *cfg_args = NULL;
+ const char *args = NULL;
+
+ int ret;
+ cfg_args = config_lookup(&g_config, "process_numa");
+ if (cfg_args == NULL)
+ return 0;
+
+ args = config_setting_get_string(cfg_args);
+ if (cfg_args == NULL) {
+ return 0;
+ }
+
+ ret = separate_str_to_array((char *)args, g_config_params.process_numa, PROTOCOL_STACK_MAX);
+ if (ret <= 0) {
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+static int parse_process_index(void)
+{
+ if (g_config_params.use_ltran) {
+ return 0;
+ }
+
+ const config_setting_t *process_idx = NULL;
+ process_idx = config_lookup(&g_config, "process_idx");
+ if (process_idx == NULL) {
+ if (g_config_params.num_process == 1) {
+ g_config_params.process_idx = 0;
+ }else {
+ return -EINVAL;
+ }
+ } else {
+ g_config_params.process_idx = (uint8_t)config_setting_get_int(process_idx);
+ }
+
+ return 0;
+}
+
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index f60963f..1beb66b 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -19,6 +19,7 @@
#include <sys/socket.h>
#include <net/if.h>
#include <net/if_arp.h>
+#include <numa.h>
#include <rte_eal.h>
#include <rte_lcore.h>
@@ -131,7 +132,7 @@ int32_t dpdk_eal_init(void)
return ret;
}
-static struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf,
+struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf,
uint32_t mbuf_cache_size, uint16_t queue_id)
{
int32_t ret;
@@ -149,7 +150,27 @@ static struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_
if (pool == NULL) {
LSTACK_LOG(ERR, LSTACK, "cannot create %s pool rte_err=%d\n", pool_name, rte_errno);
}
+
+ return pool;
+}
+
+static struct rte_mempool* get_pktmbuf_mempool(const char *name, uint16_t queue_id){
+ int32_t ret;
+ char pool_name[PATH_MAX];
+ struct rte_mempool *pool;
+
+ ret = snprintf_s(pool_name, sizeof(pool_name), PATH_MAX - 1, "%s_%hu", name, queue_id);
+ if (ret < 0) {
+ return NULL;
+ }
+ pool = rte_mempool_lookup(pool_name);
+ if (pool == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "look up %s pool rte_err=%d\n", pool_name, rte_errno);
+ }
+
+ // rte_mempool_dump(stdout, pool) ;
return pool;
+
}
static struct reg_ring_msg *create_reg_mempool(const char *name, uint16_t queue_id)
@@ -178,10 +199,7 @@ int32_t pktmbuf_pool_init(struct protocol_stack *stack, uint16_t stack_num)
return -1;
}
- stack->rxtx_pktmbuf_pool = create_pktmbuf_mempool("rxtx_mbuf",
- get_global_cfg_params()->mbuf_count_per_conn * get_global_cfg_params()->tcp_conn_count / stack_num,
- RXTX_CACHE_SZ,
- stack->queue_id);
+ stack->rxtx_pktmbuf_pool = get_pktmbuf_mempool("rxtx_mbuf", stack->queue_id);
if (stack->rxtx_pktmbuf_pool == NULL) {
return -1;
}
@@ -201,7 +219,7 @@ struct rte_ring *create_ring(const char *name, uint32_t count, uint32_t flags, i
char ring_name[RTE_RING_NAMESIZE] = {0};
struct rte_ring *ring;
- int32_t ret = snprintf_s(ring_name, sizeof(ring_name), RTE_RING_NAMESIZE - 1, "%s_%d", name, queue_id);
+ int32_t ret = snprintf_s(ring_name, sizeof(ring_name), RTE_RING_NAMESIZE - 1, "%s_%d_%d", name, get_global_cfg_params()->process_idx, queue_id);
if (ret < 0) {
return NULL;
}
@@ -286,6 +304,12 @@ void lstack_log_level_init(void)
}
}
+// get port id
+inline uint16_t get_port_id(){
+ uint16_t port_id = get_global_cfg_params()->port_id;
+ return port_id;
+}
+
static int32_t ethdev_port_id(uint8_t *mac)
{
int32_t port_id;
@@ -412,89 +436,111 @@ static void rss_setup(const int port_id, const uint16_t nb_queues)
int32_t dpdk_ethdev_init(void)
{
uint16_t nb_queues = get_global_cfg_params()->num_cpu;
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
-
- int32_t port_id = ethdev_port_id(get_global_cfg_params()->mac_addr);
- if (port_id < 0) {
- return port_id;
+ if (get_global_cfg_params()->seperate_send_recv) {
+ nb_queues = get_global_cfg_params()->num_cpu * 2;
}
- struct rte_eth_dev_info dev_info;
- int32_t ret = rte_eth_dev_info_get(port_id, &dev_info);
- if (ret != 0) {
- LSTACK_LOG(ERR, LSTACK, "get dev info ret=%d\n", ret);
- return ret;
+ if (!use_ltran()) {
+ nb_queues = get_global_cfg_params()->tot_queue_num;
}
- int32_t max_queues = LWIP_MIN(dev_info.max_rx_queues, dev_info.max_tx_queues);
- if (max_queues < nb_queues) {
- LSTACK_LOG(ERR, LSTACK, "port_id %d max_queues=%d\n", port_id, max_queues);
- return -EINVAL;
- }
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
- struct eth_params *eth_params = alloc_eth_params(port_id, nb_queues);
- if (eth_params == NULL) {
- return -ENOMEM;
- }
- eth_params_checksum(ð_params->conf, &dev_info);
- int32_t rss_enable = eth_params_rss(ð_params->conf, &dev_info);
- stack_group->eth_params = eth_params;
- stack_group->port_id = eth_params->port_id;
- stack_group->rx_offload = eth_params->conf.rxmode.offloads;
- stack_group->tx_offload = eth_params->conf.txmode.offloads;
-
- for (uint32_t i = 0; i < stack_group->stack_num; i++) {
- struct protocol_stack *stack = stack_group->stacks[i];
- if (likely(stack)) {
- stack->port_id = stack_group->port_id;
- } else {
- LSTACK_LOG(ERR, LSTACK, "empty stack at stack_num %d\n", i);
- stack_group->eth_params = NULL;
- free(eth_params);
- return -EINVAL;
- }
- }
-
- ret = rte_eth_dev_configure(port_id, nb_queues, nb_queues, ð_params->conf);
- if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "cannot config eth dev at port %d: %s\n", port_id, rte_strerror(-ret));
- stack_group->eth_params = NULL;
- free(eth_params);
- return ret;
- }
+ int32_t port_id = ethdev_port_id(get_global_cfg_params()->mac_addr);
+ if (port_id < 0) {
+ return port_id;
+ }
- ret = dpdk_ethdev_start();
- if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_start failed\n");
- stack_group->eth_params = NULL;
- free(eth_params);
- return ret;
- }
+ struct rte_eth_dev_info dev_info;
+ int32_t ret = rte_eth_dev_info_get(port_id, &dev_info);
+ if (ret != 0) {
+ LSTACK_LOG(ERR, LSTACK, "get dev info ret=%d\n", ret);
+ return ret;
+ }
+
+ int32_t max_queues = LWIP_MIN(dev_info.max_rx_queues, dev_info.max_tx_queues);
+ if (max_queues < nb_queues) {
+ LSTACK_LOG(ERR, LSTACK, "port_id %d max_queues=%d\n", port_id, max_queues);
+ return -EINVAL;
+ }
- if (rss_enable) {
- rss_setup(port_id, nb_queues);
- stack_group->reta_mask = dev_info.reta_size - 1;
+ struct eth_params *eth_params = alloc_eth_params(port_id, nb_queues);
+ if (eth_params == NULL) {
+ return -ENOMEM;
+ }
+ eth_params_checksum(ð_params->conf, &dev_info);
+ int32_t rss_enable = 0;
+ if (use_ltran()) {
+ rss_enable = eth_params_rss(ð_params->conf, &dev_info);
+ }
+ stack_group->eth_params = eth_params;
+ stack_group->port_id = eth_params->port_id;
+ stack_group->rx_offload = eth_params->conf.rxmode.offloads;
+ stack_group->tx_offload = eth_params->conf.txmode.offloads;
+
+ if (get_global_cfg_params()->is_primary) {
+ for (uint32_t i = 0; i < stack_group->stack_num; i++) {
+ struct protocol_stack *stack = stack_group->stacks[i];
+ if (likely(stack)) {
+ stack->port_id = stack_group->port_id;
+ } else {
+ LSTACK_LOG(ERR, LSTACK, "empty stack at stack_num %d\n", i);
+ stack_group->eth_params = NULL;
+ free(eth_params);
+ return -EINVAL;
+ }
+ }
+
+ ret = rte_eth_dev_configure(port_id, nb_queues, nb_queues, ð_params->conf);
+ if (ret < 0) {
+ LSTACK_LOG(ERR, LSTACK, "cannot config eth dev at port %d: %s\n", port_id, rte_strerror(-ret));
+ stack_group->eth_params = NULL;
+ free(eth_params);
+ return ret;
+ }
+
+ ret = dpdk_ethdev_start();
+ if (ret < 0) {
+ LSTACK_LOG(ERR, LSTACK, "dpdk_ethdev_start failed\n");
+ stack_group->eth_params = NULL;
+ free(eth_params);
+ return ret;
+ }
+
+ if (rss_enable && use_ltran()) {
+ rss_setup(port_id, nb_queues);
+ stack_group->reta_mask = dev_info.reta_size - 1;
+ }
+ stack_group->nb_queues = nb_queues;
}
- stack_group->nb_queues = nb_queues;
return 0;
}
-static int32_t dpdk_ethdev_setup(const struct eth_params *eth_params, const struct protocol_stack *stack)
+static int32_t dpdk_ethdev_setup(const struct eth_params *eth_params, uint16_t idx)
{
int32_t ret;
- ret = rte_eth_rx_queue_setup(eth_params->port_id, stack->queue_id, eth_params->nb_rx_desc, stack->socket_id,
- ð_params->rx_conf, stack->rxtx_pktmbuf_pool);
+ struct rte_mempool *rxtx_pktmbuf_pool = get_protocol_stack_group()->total_rxtx_pktmbuf_pool[idx];
+
+ uint16_t socket_id = 0;
+ struct cfg_params * cfg = get_global_cfg_params();
+ if (!cfg->use_ltran && cfg->num_process == 1) {
+ socket_id = numa_node_of_cpu(cfg->cpus[idx]);
+ }else {
+ socket_id = cfg->process_numa[idx];
+ }
+ ret = rte_eth_rx_queue_setup(eth_params->port_id, idx, eth_params->nb_rx_desc, socket_id,
+ ð_params->rx_conf, rxtx_pktmbuf_pool);
if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "cannot setup rx_queue %hu: %s\n", stack->queue_id, rte_strerror(-ret));
+ LSTACK_LOG(ERR, LSTACK, "cannot setup rx_queue %hu: %s\n", idx, rte_strerror(-ret));
return -1;
}
- ret = rte_eth_tx_queue_setup(eth_params->port_id, stack->queue_id, eth_params->nb_tx_desc, stack->socket_id,
+ ret = rte_eth_tx_queue_setup(eth_params->port_id, idx, eth_params->nb_tx_desc, socket_id,
ð_params->tx_conf);
if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "cannot setup tx_queue %hu: %s\n", stack->queue_id, rte_strerror(-ret));
+ LSTACK_LOG(ERR, LSTACK, "cannot setup tx_queue %hu: %s\n", idx, rte_strerror(-ret));
return -1;
}
@@ -505,12 +551,9 @@ int32_t dpdk_ethdev_start(void)
{
int32_t ret;
const struct protocol_stack_group *stack_group = get_protocol_stack_group();
- const struct protocol_stack *stack = NULL;
-
- for (int32_t i = 0; i < stack_group->stack_num; i++) {
- stack = stack_group->stacks[i];
- ret = dpdk_ethdev_setup(stack_group->eth_params, stack);
+ for (int32_t i = 0; i < get_global_cfg_params()->tot_queue_num; i++) {
+ ret = dpdk_ethdev_setup(stack_group->eth_params, i);
if (ret < 0) {
return ret;
}
@@ -529,6 +572,7 @@ int32_t dpdk_init_lstack_kni(void)
{
struct protocol_stack_group *stack_group = get_protocol_stack_group();
+
stack_group->kni_pktmbuf_pool = create_pktmbuf_mempool("kni_mbuf", KNI_NB_MBUF, 0, 0);
if (stack_group->kni_pktmbuf_pool == NULL) {
return -1;
@@ -568,7 +612,7 @@ int32_t init_dpdk_ethdev(void)
return -1;
}
- if (get_global_cfg_params()->kni_switch) {
+ if (get_global_cfg_params()->kni_switch && get_global_cfg_params()->is_primary) {
ret = dpdk_init_lstack_kni();
if (ret < 0) {
return -1;
diff --git a/src/lstack/core/lstack_init.c b/src/lstack/core/lstack_init.c
index 34b2c0d..e8fa0dc 100644
--- a/src/lstack/core/lstack_init.c
+++ b/src/lstack/core/lstack_init.c
@@ -356,6 +356,8 @@ __attribute__((constructor)) void gazelle_network_init(void)
}
}
+ // @todo, check process 2 dumped, resorce need to release.
+
gazelle_signal_init();
/*
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 300d7af..48eff1d 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -13,13 +13,14 @@
#include <pthread.h>
#include <stdatomic.h>
+#include <rte_kni.h>
+
#include <lwip/sockets.h>
#include <lwip/tcpip.h>
#include <lwip/tcp.h>
#include <lwip/memp_def.h>
#include <lwipsock.h>
#include <lwip/posix_api.h>
-#include <rte_kni.h>
#include <securec.h>
#include <numa.h>
@@ -29,6 +30,7 @@
#include "lstack_log.h"
#include "lstack_dpdk.h"
#include "lstack_ethdev.h"
+#include "lstack_vdev.h"
#include "lstack_lwip.h"
#include "lstack_protocol_stack.h"
#include "lstack_cfg.h"
@@ -79,6 +81,28 @@ struct protocol_stack_group *get_protocol_stack_group(void)
return &g_stack_group;
}
+int get_min_conn_stack(struct protocol_stack_group *stack_group){
+ int min_conn_stk_idx = 0;
+ int min_conn_num = GAZELLE_MAX_CLIENTS;
+ for (int i = 0; i < stack_group->stack_num; i++) {
+ struct protocol_stack* stack = stack_group->stacks[i];
+ if (get_global_cfg_params()->seperate_send_recv) {
+ if (!stack->is_send_thread && stack->conn_num < min_conn_num) {
+ min_conn_stk_idx = i;
+ min_conn_num = stack->conn_num;
+ }
+ }else {
+ if (stack->conn_num < min_conn_num) {
+ min_conn_stk_idx = i;
+ min_conn_num = stack->conn_num;
+ }
+ }
+
+ }
+ return min_conn_stk_idx;
+
+}
+
struct protocol_stack *get_protocol_stack(void)
{
return g_stack_p;
@@ -105,22 +129,23 @@ struct protocol_stack *get_bind_protocol_stack(void)
struct protocol_stack_group *stack_group = get_protocol_stack_group();
uint16_t index = 0;
+ int min_conn_num = GAZELLE_MAX_CLIENTS;
/* close listen shadow, per app communication thread select only one stack */
- if (get_global_cfg_params()->listen_shadow == false) {
- static _Atomic uint16_t stack_index = 0;
- index = atomic_fetch_add(&stack_index, 1);
- if (index >= stack_group->stack_num) {
- LSTACK_LOG(ERR, LSTACK, "thread =%hu larger than stack num = %hu\n", index, stack_group->stack_num);
- return NULL;
- }
- /* use listen shadow, app communication thread maybe more than stack num, select the least load stack */
- } else {
- for (uint16_t i = 1; i < stack_group->stack_num; i++) {
- if (stack_group->stacks[i]->conn_num < stack_group->stacks[index]->conn_num) {
+ for (uint16_t i = 0; i < stack_group->stack_num; i++) {
+ struct protocol_stack* stack = stack_group->stacks[i];
+ if (get_global_cfg_params()->seperate_send_recv) {
+ if (stack->is_send_thread && stack->conn_num < min_conn_num) {
index = i;
+ min_conn_num = stack->conn_num;
+ }
+ }else {
+ if (stack->conn_num < min_conn_num) {
+ index = i;
+ min_conn_num = stack->conn_num;
}
}
+
}
bind_stack = stack_group->stacks[index];
@@ -180,27 +205,35 @@ void low_power_idling(struct protocol_stack *stack)
}
}
-static int32_t create_thread(uint16_t queue_id, char *thread_name, stack_thread_func func)
+static int32_t create_thread(void *arg, char *thread_name, stack_thread_func func)
{
/* thread may run slow, if arg is temp var maybe have relese */
- static uint16_t queue[PROTOCOL_STACK_MAX];
char name[PATH_MAX];
pthread_t tid;
int32_t ret;
+ struct thread_params *t_params = (struct thread_params*) arg;
- if (queue_id >= PROTOCOL_STACK_MAX) {
- LSTACK_LOG(ERR, LSTACK, "queue_id is %hu exceed max=%d\n", queue_id, PROTOCOL_STACK_MAX);
+ if (t_params->queue_id >= PROTOCOL_STACK_MAX) {
+ LSTACK_LOG(ERR, LSTACK, "queue_id is %hu exceed max=%d\n", t_params->queue_id, PROTOCOL_STACK_MAX);
return -1;
}
- queue[queue_id] = queue_id;
- ret = sprintf_s(name, sizeof(name), "%s%02hu", thread_name, queue[queue_id]);
- if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "set name failed\n");
- return -1;
+ if (get_global_cfg_params()->seperate_send_recv){
+ ret = sprintf_s(name, sizeof(name), "%s", thread_name);
+ if (ret < 0) {
+ LSTACK_LOG(ERR, LSTACK, "set name failed\n");
+ return -1;
+ }
+
+ }else {
+ ret = sprintf_s(name, sizeof(name), "%s%02hu", thread_name, t_params->queue_id);
+ if (ret < 0) {
+ LSTACK_LOG(ERR, LSTACK, "set name failed\n");
+ return -1;
+ }
}
- ret = pthread_create(&tid, NULL, func, &queue[queue_id]);
+ ret = pthread_create(&tid, NULL, func, arg);
if (ret != 0) {
LSTACK_LOG(ERR, LSTACK, "pthread_create ret=%d\n", ret);
return -1;
@@ -221,7 +254,7 @@ static void* gazelle_wakeup_thread(void *arg)
struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id];
struct cfg_params *cfg = get_global_cfg_params();
- int32_t lcore_id = cfg->wakeup[stack->queue_id];
+ int32_t lcore_id = cfg->wakeup[stack->stack_idx];
thread_affinity_init(lcore_id);
struct timespec st = {
@@ -252,12 +285,13 @@ static void* gazelle_wakeup_thread(void *arg)
static void* gazelle_kernelevent_thread(void *arg)
{
- uint16_t queue_id = *(uint16_t *)arg;
- struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id];
+ struct thread_params *t_params = (struct thread_params*) arg;
+ uint16_t idx = t_params->idx;
+ struct protocol_stack *stack = get_protocol_stack_group()->stacks[idx];
bind_to_stack_numa(stack);
- LSTACK_LOG(INFO, LSTACK, "kernelevent_%02hu start\n", queue_id);
+ LSTACK_LOG(INFO, LSTACK, "kernelevent_%02hu start\n", idx);
for (;;) {
stack->kernel_event_num = posix_api->epoll_wait_fn(stack->epollfd, stack->kernel_events, KERNEL_EPOLL_MAX, -1);
@@ -269,13 +303,14 @@ static void* gazelle_kernelevent_thread(void *arg)
return NULL;
}
-static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id)
+static int32_t init_stack_value(struct protocol_stack *stack, void *arg)
{
+ struct thread_params *t_params = (struct thread_params*) arg;
struct protocol_stack_group *stack_group = get_protocol_stack_group();
stack->tid = rte_gettid();
- stack->queue_id = queue_id;
- stack->cpu_id = get_global_cfg_params()->cpus[queue_id];
+ stack->queue_id = t_params->queue_id;
+ stack->stack_idx = t_params->idx;
stack->lwip_stats = &lwip_stats;
init_list_node(&stack->recv_list);
@@ -284,14 +319,27 @@ static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id)
sys_calibrate_tsc();
stack_stat_init();
- stack_group->stacks[queue_id] = stack;
- set_stack_idx(queue_id);
+ stack_group->stacks[t_params->idx] = stack;
+ set_stack_idx(t_params->idx);
stack->epollfd = posix_api->epoll_create_fn(GAZELLE_LSTACK_MAX_CONN);
if (stack->epollfd < 0) {
return -1;
}
+ int idx = t_params->idx;
+ if (get_global_cfg_params()->seperate_send_recv) {
+ if (idx % 2 == 0) {
+ stack->cpu_id = get_global_cfg_params()->recv_cpus[idx/2];
+ stack->is_send_thread = 0;
+ }else {
+ stack->cpu_id = get_global_cfg_params()->send_cpus[idx/2];
+ stack->is_send_thread = 1;
+ }
+ }else {
+ stack->cpu_id = get_global_cfg_params()->cpus[idx];
+ }
+
stack->socket_id = numa_node_of_cpu(stack->cpu_id);
if (stack->socket_id < 0) {
LSTACK_LOG(ERR, LSTACK, "numa_node_of_cpu failed\n");
@@ -317,16 +365,17 @@ void wait_sem_value(sem_t *sem, int32_t wait_value)
} while (sem_val < wait_value);
}
-static int32_t create_affiliate_thread(uint16_t queue_id, bool wakeup_enable)
+static int32_t create_affiliate_thread(void *arg, bool wakeup_enable)
{
+
if (wakeup_enable) {
- if (create_thread(queue_id, "gazelleweakup", gazelle_wakeup_thread) != 0) {
+ if (create_thread(arg, "gazelleweakup", gazelle_wakeup_thread) != 0) {
LSTACK_LOG(ERR, LSTACK, "gazelleweakup errno=%d\n", errno);
return -1;
}
}
- if (create_thread(queue_id, "gazellekernel", gazelle_kernelevent_thread) != 0) {
+ if (create_thread(arg, "gazellekernel", gazelle_kernelevent_thread) != 0) {
LSTACK_LOG(ERR, LSTACK, "gazellekernel errno=%d\n", errno);
return -1;
}
@@ -334,10 +383,9 @@ static int32_t create_affiliate_thread(uint16_t queue_id, bool wakeup_enable)
return 0;
}
-static struct protocol_stack *stack_thread_init(uint16_t queue_id)
+static struct protocol_stack *stack_thread_init(void *arg)
{
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
-
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
struct protocol_stack *stack = calloc(1, sizeof(*stack));
if (stack == NULL) {
LSTACK_LOG(ERR, LSTACK, "malloc stack failed\n");
@@ -345,14 +393,14 @@ static struct protocol_stack *stack_thread_init(uint16_t queue_id)
return NULL;
}
- if (init_stack_value(stack, queue_id) != 0) {
+ if (init_stack_value(stack, arg) != 0) {
goto END;
}
if (init_stack_numa_cpuset(stack) < 0) {
goto END;
}
- if (create_affiliate_thread(queue_id, stack_group->wakeup_enable) < 0) {
+ if (create_affiliate_thread(arg, stack_group->wakeup_enable) < 0) {
goto END;
}
@@ -402,19 +450,22 @@ static void wakeup_kernel_event(struct protocol_stack *stack)
}
__atomic_store_n(&wakeup->have_kernel_event, true, __ATOMIC_RELEASE);
- if (list_is_null(&wakeup->wakeup_list[stack->queue_id])) {
- list_add_node(&stack->wakeup_list, &wakeup->wakeup_list[stack->queue_id]);
+ if (list_is_null(&wakeup->wakeup_list[stack->stack_idx])) {
+ list_add_node(&stack->wakeup_list, &wakeup->wakeup_list[stack->stack_idx]);
}
}
stack->kernel_event_num = 0;
}
+
static void* gazelle_stack_thread(void *arg)
{
- uint16_t queue_id = *(uint16_t *)arg;
+ struct thread_params *t_params = (struct thread_params*) arg;
+
+ uint16_t queue_id = t_params->queue_id;
struct cfg_params *cfg = get_global_cfg_params();
- bool use_ltran_flag = cfg->use_ltran;;
+ uint8_t use_ltran_flag = cfg->use_ltran;;
bool kni_switch = cfg->kni_switch;
uint32_t read_connect_number = cfg->read_connect_number;
uint32_t rpc_number = cfg->rpc_number;
@@ -424,7 +475,8 @@ static void* gazelle_stack_thread(void *arg)
struct protocol_stack_group *stack_group = get_protocol_stack_group();
bool wakeup_thread_enable = stack_group->wakeup_enable;
- struct protocol_stack *stack = stack_thread_init(queue_id);
+ struct protocol_stack *stack = stack_thread_init(arg);
+
if (stack == NULL) {
/* exit in main thread, avoid create mempool and exit at the same time */
set_init_fail();
@@ -432,8 +484,12 @@ static void* gazelle_stack_thread(void *arg)
LSTACK_LOG(ERR, LSTACK, "stack_thread_init failed queue_id=%hu\n", queue_id);
return NULL;
}
+ if (!use_ltran() && queue_id == 0) {
+ init_listen_and_user_ports();
+ }
sem_post(&stack_group->all_init);
+
LSTACK_LOG(INFO, LSTACK, "stack_%02hu init success\n", queue_id);
for (;;) {
@@ -452,6 +508,7 @@ static void* gazelle_stack_thread(void *arg)
* so processing KNI requests only in the thread with queue_id No.0 is sufficient. */
if (kni_switch && !queue_id && !(wakeup_tick & 0xfff)) {
rte_kni_handle_request(get_gazelle_kni());
+ kni_handle_rx(get_port_id());
}
wakeup_tick++;
@@ -466,6 +523,11 @@ static void* gazelle_stack_thread(void *arg)
return NULL;
}
+static void libnet_listen_thread(void *arg){
+ struct cfg_params * cfg_param = get_global_cfg_params();
+ recv_pkts_from_other_process(cfg_param->process_idx, arg);
+}
+
static int32_t init_protocol_sem(void)
{
int32_t ret;
@@ -498,8 +560,14 @@ int32_t init_protocol_stack(void)
{
struct protocol_stack_group *stack_group = get_protocol_stack_group();
int32_t ret;
+ char name[PATH_MAX];
+
+ if (!get_global_cfg_params()->seperate_send_recv) {
+ stack_group->stack_num = get_global_cfg_params()->num_cpu;
+ }else {
+ stack_group->stack_num = get_global_cfg_params()->num_cpu * 2;
+ }
- stack_group->stack_num = get_global_cfg_params()->num_cpu;
stack_group->wakeup_enable = (get_global_cfg_params()->num_wakeup > 0) ? true : false;
init_list_node(&stack_group->poll_list);
pthread_spin_init(&stack_group->poll_list_lock, PTHREAD_PROCESS_PRIVATE);
@@ -508,9 +576,43 @@ int32_t init_protocol_stack(void)
if (init_protocol_sem() != 0) {
return -1;
}
+ int queue_num = get_global_cfg_params()->num_queue;
+ struct thread_params *t_params[queue_num];
+ int process_index = get_global_cfg_params()->process_idx;
+
+ if (get_global_cfg_params()->is_primary) {
+ for (uint16_t idx = 0; idx < get_global_cfg_params()->tot_queue_num; idx++) {
+ struct rte_mempool* rxtx_mbuf = create_pktmbuf_mempool("rxtx_mbuf",
+ get_global_cfg_params()->mbuf_count_per_conn * get_global_cfg_params()->tcp_conn_count / stack_group->stack_num, RXTX_CACHE_SZ, idx);
+ get_protocol_stack_group()->total_rxtx_pktmbuf_pool[idx] = rxtx_mbuf;
+ }
+ }
- for (uint32_t i = 0; i < stack_group->stack_num; i++) {
- ret = create_thread(i, "gazellestack", gazelle_stack_thread);
+ for (uint32_t i = 0; i < queue_num; i++) {
+ if (get_global_cfg_params()->seperate_send_recv) {
+ if (i % 2 == 0) {
+ ret = sprintf_s(name, sizeof(name), "%s_%d_%d", LSTACK_RECV_THREAD_NAME, process_index, i/2);
+ if (ret < 0) {
+ return -1;
+ }
+ }else {
+ ret = sprintf_s(name, sizeof(name), "%s_%d_%d", LSTACK_SEND_THREAD_NAME, process_index, i/2);
+ if (ret < 0) {
+ return -1;
+ }
+ }
+ }else {
+ ret = sprintf_s(name, sizeof(name), "%s", LSTACK_THREAD_NAME);
+ if (ret < 0) {
+ return -1;
+ }
+ }
+
+ t_params[i] = malloc(sizeof(struct thread_params));
+ t_params[i]->idx = i;
+ t_params[i]->queue_id = process_index * queue_num + i;
+
+ ret = create_thread((void *)t_params[i], name, gazelle_stack_thread);
if (ret != 0) {
return ret;
}
@@ -518,6 +620,20 @@ int32_t init_protocol_stack(void)
wait_sem_value(&stack_group->thread_phase1, stack_group->stack_num);
+ for(int idx = 0; idx < queue_num; idx++){
+ free(t_params[idx]);
+ }
+
+ if (!use_ltran()) {
+ ret = sem_init(&stack_group->sem_listen_thread, 0, 0);
+ ret = sprintf_s(name, sizeof(name), "%s", "listen_thread");
+ struct sys_thread *thread = sys_thread_new(name, libnet_listen_thread, (void*)(&stack_group->sem_listen_thread), 0, 0);
+ free(thread);
+ sem_wait(&stack_group->sem_listen_thread);
+
+ create_flow_rule_map();
+ }
+
if (get_init_fail()) {
return -1;
}
@@ -684,7 +800,7 @@ void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack
for (int32_t i = 0; i < stack_group->stack_num; i++) {
stack = stack_group->stacks[i];
- if (cur_stack == stack) {
+ if (cur_stack == stack && use_ltran()) {
continue;
}
@@ -718,7 +834,7 @@ void stack_clean_epoll(struct rpc_msg *msg)
struct protocol_stack *stack = get_protocol_stack();
struct wakeup_poll *wakeup = (struct wakeup_poll *)msg->args[MSG_ARG_0].p;
- list_del_node_null(&wakeup->wakeup_list[stack->queue_id]);
+ list_del_node_null(&wakeup->wakeup_list[stack->stack_idx]);
}
/* when fd is listenfd, listenfd of all protocol stack thread will be closed */
@@ -769,8 +885,13 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog)
}
struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ int min_conn_stk_idx = get_min_conn_stack(stack_group);
+
for (int32_t i = 0; i < stack_group->stack_num; ++i) {
stack = stack_group->stacks[i];
+ if (get_global_cfg_params()->seperate_send_recv && stack->is_send_thread) {
+ continue;
+ }
if (stack != cur_stack) {
clone_fd = rpc_call_shadow_fd(stack, fd, &addr, sizeof(addr));
if (clone_fd < 0) {
@@ -781,6 +902,12 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog)
clone_fd = fd;
}
+ if (min_conn_stk_idx == i) {
+ get_socket_by_fd(clone_fd)->conn->is_master_fd = 1;
+ }else {
+ get_socket_by_fd(clone_fd)->conn->is_master_fd = 0;
+ }
+
ret = rpc_call_listen(clone_fd, backlog);
if (ret < 0) {
stack_broadcast_close(fd);
diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h
index 2705fee..942c0b7 100644
--- a/src/lstack/include/lstack_cfg.h
+++ b/src/lstack/include/lstack_cfg.h
@@ -65,6 +65,8 @@ struct cfg_params {
uint8_t mac_addr[ETHER_ADDR_LEN];
uint16_t num_cpu;
uint32_t cpus[CFG_MAX_CPUS];
+ uint32_t send_cpus[CFG_MAX_CPUS];
+ uint32_t recv_cpus[CFG_MAX_CPUS];
uint16_t num_wakeup;
uint32_t wakeup[CFG_MAX_CPUS];
uint8_t num_ports;
@@ -79,11 +81,22 @@ struct cfg_params {
uint32_t read_connect_number;
uint32_t rpc_number;
uint32_t nic_read_number;
- bool use_ltran; // ture:lstack read from nic false:read form ltran
+ uint8_t use_ltran; // ture:lstack read from nic false:read form ltran
+
+ uint16_t num_process;
+ uint16_t num_listen_port;
+ uint16_t port_id;
+ uint16_t is_primary;
+ uint16_t num_queue;
+ uint16_t tot_queue_num;
+ uint8_t process_idx;
+ uint32_t process_numa[PROTOCOL_STACK_MAX];
+
bool kni_switch;
bool listen_shadow; // true:listen in all stack thread. false:listen in one stack thread.
bool app_bind_numa;
bool main_thread_affinity;
+ bool seperate_send_recv;
int dpdk_argc;
char **dpdk_argv;
struct secondary_attach_arg sec_attach_arg;
@@ -94,7 +107,7 @@ struct cfg_params {
struct cfg_params *get_global_cfg_params(void);
-static inline bool use_ltran(void)
+static inline uint8_t use_ltran(void)
{
return get_global_cfg_params()->use_ltran;
}
diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h
index c3bc527..55ca7a1 100644
--- a/src/lstack/include/lstack_dpdk.h
+++ b/src/lstack/include/lstack_dpdk.h
@@ -51,5 +51,6 @@ void dpdk_skip_nic_init(void);
int32_t dpdk_init_lstack_kni(void);
void dpdk_restore_pci(void);
bool port_in_stack_queue(uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
-
+uint16_t get_port_id();
+struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf,uint32_t mbuf_cache_size, uint16_t queue_id);
#endif /* GAZELLE_DPDK_H */
diff --git a/src/lstack/include/lstack_ethdev.h b/src/lstack/include/lstack_ethdev.h
index 0b53cde..a690adb 100644
--- a/src/lstack/include/lstack_ethdev.h
+++ b/src/lstack/include/lstack_ethdev.h
@@ -13,6 +13,19 @@
#ifndef __GAZELLE_ETHDEV_H__
#define __GAZELLE_ETHDEV_H__
+#define INVAILD_PROCESS_IDX 255
+
+enum port_type {
+ PORT_LISTEN,
+ PORT_CONNECT,
+};
+
+enum PACKET_TRANSFER_TYPE{
+ TRANSFER_KERNEL = -1,
+ TRANSFER_OTHER_THREAD,
+ TRANSFER_CURRENT_THREAD,
+};
+
struct protocol_stack;
struct rte_mbuf;
struct lstack_dev_ops {
@@ -22,7 +35,15 @@ struct lstack_dev_ops {
int32_t ethdev_init(struct protocol_stack *stack);
int32_t eth_dev_poll(void);
-int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, bool use_ltran_flag, uint32_t nic_read_number);
+int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_flag, uint32_t nic_read_number);
void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack);
+int recv_pkts_from_other_process(int process_index, void* arg);
+void create_flow_rule_map();
+void kni_handle_rx(uint16_t port_id);
+void delete_user_process_port(uint16_t dst_port, enum port_type type);
+void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type);
+void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
+void config_flow_director(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
+
#endif /* __GAZELLE_ETHDEV_H__ */
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index 11b001c..b5d3f7d 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -41,6 +41,7 @@ struct protocol_stack {
uint16_t port_id;
uint16_t socket_id;
uint16_t cpu_id;
+ uint32_t stack_idx;
cpu_set_t idle_cpuset; /* idle cpu in numa of stack, app thread bind to it */
int32_t epollfd; /* kernel event thread epoll fd */
@@ -53,6 +54,8 @@ struct protocol_stack {
uint32_t reg_head;
volatile bool low_power;
+ bool is_send_thread;
+
lockless_queue rpc_queue __rte_cache_aligned;
char pad __rte_cache_aligned;
@@ -93,6 +96,8 @@ struct protocol_stack_group {
bool wakeup_enable;
struct list_node poll_list;
pthread_spinlock_t poll_list_lock;
+ sem_t sem_listen_thread;
+ struct rte_mempool *total_rxtx_pktmbuf_pool[PROTOCOL_STACK_MAX];
/* dfx stats */
bool latency_start;
@@ -131,6 +136,10 @@ void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup);
void stack_send_pkts(struct protocol_stack *stack);
struct rpc_msg;
+struct thread_params{
+ uint16_t queue_id;
+ uint16_t idx;
+};
void stack_clean_epoll(struct rpc_msg *msg);
void stack_arp(struct rpc_msg *msg);
void stack_socket(struct rpc_msg *msg);
@@ -146,4 +155,5 @@ void stack_getsockopt(struct rpc_msg *msg);
void stack_setsockopt(struct rpc_msg *msg);
void stack_fcntl(struct rpc_msg *msg);
void stack_ioctl(struct rpc_msg *msg);
+void kni_handle_tx(struct rte_mbuf *mbuf);
#endif
diff --git a/src/lstack/include/lstack_vdev.h b/src/lstack/include/lstack_vdev.h
index 0693c4d..0995277 100644
--- a/src/lstack/include/lstack_vdev.h
+++ b/src/lstack/include/lstack_vdev.h
@@ -21,4 +21,11 @@ enum reg_ring_type;
void vdev_dev_ops_init(struct lstack_dev_ops *dev_ops);
int vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple);
+int recv_pkts_from_other_process(int process_index, void* arg);
+void create_flow_rule_map();
+void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
+void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
+void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add);
+void init_listen_and_user_ports();
+
#endif /* _GAZELLE_VDEV_H_ */
diff --git a/src/lstack/lstack.conf b/src/lstack/lstack.conf
index cf81954..389a81c 100644
--- a/src/lstack/lstack.conf
+++ b/src/lstack/lstack.conf
@@ -49,3 +49,7 @@ host_addr="192.168.1.10"
mask_addr="255.255.255.0"
gateway_addr="192.168.1.1"
devices="aa:bb:cc:dd:ee:ff"
+
+num_process=2
+process_numa="0,1"
+process_idx=0
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 1441f64..60ea897 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -10,14 +10,22 @@
* See the Mulan PSL v2 for more details.
*/
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <rte_kni.h>
#include <rte_ethdev.h>
#include <rte_malloc.h>
#include <lwip/debug.h>
#include <lwip/etharp.h>
+#include <lwip/posix_api.h>
#include <netif/ethernet.h>
+#include "lwip/tcp.h"
+#include <lwip/prot/tcp.h>
#include <securec.h>
+#include <rte_jhash.h>
#include "lstack_cfg.h"
#include "lstack_vdev.h"
@@ -28,9 +36,33 @@
#include "dpdk_common.h"
#include "lstack_protocol_stack.h"
#include "lstack_ethdev.h"
+#include "lstack_thread_rpc.h"
/* FRAME_MTU + 14byte header */
#define MBUF_MAX_LEN 1514
+#define MAX_PATTERN_NUM 4
+#define MAX_ACTION_NUM 2
+#define FULL_MASK 0xffffffff /* full mask */
+#define EMPTY_MASK 0x0 /* empty mask */
+#define LSTACK_MBUF_LEN 64
+#define TRANSFER_TCP_MUBF_LEN LSTACK_MBUF_LEN + 3
+#define DELETE_FLOWS_PARAMS_NUM 3
+#define DELETE_FLOWS_PARAMS_LENGTH 30
+#define CREATE_FLOWS_PARAMS_NUM 6
+#define CREATE_FLOWS_PARAMS_LENGTH 60
+#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH 25
+#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM 3
+#define REPLY_LEN 10
+#define SUCCESS_REPLY "success"
+#define ERROR_REPLY "error"
+#define PACKET_READ_SIZE 32
+
+char *client_path = "/var/run/gazelle/client.socket";
+char *server_path = "/var/run/gazelle/server.socket";
+const char *split_delim = ",";
+
+uint8_t g_user_ports[65535] = {INVAILD_PROCESS_IDX,};
+uint8_t g_listen_ports[65535] = {INVAILD_PROCESS_IDX,};
void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
{
@@ -45,6 +77,7 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
struct rte_mbuf *next_m = NULL;
pkt_len = (uint16_t)rte_pktmbuf_pkt_len(m);
+
while (m != NULL) {
len = (uint16_t)rte_pktmbuf_data_len(m);
payload = rte_pktmbuf_mtod(m, void *);
@@ -82,6 +115,7 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
}
}
+
int32_t eth_dev_poll(void)
{
uint32_t nr_pkts;
@@ -115,8 +149,507 @@ int32_t eth_dev_poll(void)
return nr_pkts;
}
+void init_listen_and_user_ports(){
+ memset(g_user_ports, INVAILD_PROCESS_IDX, sizeof(g_user_ports));
+ memset(g_listen_ports, INVAILD_PROCESS_IDX, sizeof(g_listen_ports));
+}
+
+int transfer_pkt_to_other_process(char *buf, int process_index, int write_len, bool need_reply)
+{
+ /* other process queue_id */
+ struct sockaddr_un serun;
+ int sockfd;
+ int ret = 0;
+
+ if ((sockfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0)) < 0) {
+ return -1;
+ }
+
+ memset(&serun, 0, sizeof(serun));
+ serun.sun_family = AF_UNIX;
+ sprintf_s(serun.sun_path, PATH_MAX,"%s%d", server_path, process_index);
+ int len = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
+ if (posix_api->connect_fn(sockfd, (struct sockaddr *)&serun, len) < 0){
+ return -1;
+ }
+ posix_api->write_fn(sockfd, buf, write_len);
+ if (need_reply) {
+ char reply_message[REPLY_LEN];
+ posix_api->read_fn(sockfd, reply_message, REPLY_LEN);
+ if (strcmp(reply_message, SUCCESS_REPLY) == 0) {
+ ret = 0;
+ }else {
+ ret = -1;
+ }
+ }
+ posix_api->close_fn(sockfd);
+
+ return ret;
+}
+
+struct rte_flow *
+create_flow_director(uint16_t port_id, uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip,
+ uint16_t src_port, uint16_t dst_port, struct rte_flow_error *error)
+{
+ struct rte_flow_attr attr;
+ struct rte_flow_item pattern[MAX_PATTERN_NUM];
+ struct rte_flow_action action[MAX_ACTION_NUM];
+ struct rte_flow *flow = NULL;
+ struct rte_flow_action_queue queue = { .index = queue_id };
+ struct rte_flow_item_ipv4 ip_spec;
+ struct rte_flow_item_ipv4 ip_mask;
+
+ struct rte_flow_item_tcp tcp_spec;
+ struct rte_flow_item_tcp tcp_mask;
+ int res;
+
+ memset(pattern, 0, sizeof(pattern));
+ memset(action, 0, sizeof(action));
+
+ /*
+ * set the rule attribute.
+ * in this case only ingress packets will be checked.
+ */
+ memset(&attr, 0, sizeof(struct rte_flow_attr));
+ attr.ingress = 1;
+
+ /*
+ * create the action sequence.
+ * one action only, move packet to queue
+ */
+ action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE;
+ action[0].conf = &queue;
+ action[1].type = RTE_FLOW_ACTION_TYPE_END;
+
+ // not limit eth header
+ pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH;
+
+ // ip header
+ memset(&ip_spec, 0, sizeof(struct rte_flow_item_ipv4));
+ memset(&ip_mask, 0, sizeof(struct rte_flow_item_ipv4));
+ ip_spec.hdr.dst_addr = dst_ip;
+ ip_mask.hdr.dst_addr = FULL_MASK;
+ ip_spec.hdr.src_addr = src_ip;
+ ip_mask.hdr.src_addr = FULL_MASK;
+ pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4;
+ pattern[1].spec = &ip_spec;
+ pattern[1].mask = &ip_mask;
+
+ // tcp header, full mask 0xffff
+ memset(&tcp_spec, 0, sizeof(struct rte_flow_item_tcp));
+ memset(&tcp_mask, 0, sizeof(struct rte_flow_item_tcp));
+ pattern[2].type = RTE_FLOW_ITEM_TYPE_TCP;
+ tcp_spec.hdr.src_port = src_port;
+ tcp_spec.hdr.dst_port = dst_port;
+ tcp_mask.hdr.src_port = rte_flow_item_tcp_mask.hdr.src_port;
+ tcp_mask.hdr.dst_port = rte_flow_item_tcp_mask.hdr.dst_port;
+ pattern[2].spec = &tcp_spec;
+ pattern[2].mask = &tcp_mask;
+
+ /* the final level must be always type end */
+ pattern[3].type = RTE_FLOW_ITEM_TYPE_END;
+ res = rte_flow_validate(port_id, &attr, pattern, action, error);
+ if (!res){
+ flow = rte_flow_create(port_id, &attr, pattern, action, error);
+ }else {
+ LSTACK_LOG(ERR, PORT,"rte_flow_create.rte_flow_validate error, res %d \n", res);
+ }
+
+ return flow;
+}
+
+void config_flow_director(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port){
+
+ uint16_t port_id = get_port_id();
+
+ LSTACK_LOG(INFO, LSTACK, "config_flow_director, flow queue_id %u, src_ip %u,src_port_ntohs:%u, dst_port_ntohs :%u \n",
+ queue_id, src_ip,ntohs(src_port), ntohs(dst_port) );
+
+ struct rte_flow_error error;
+ struct rte_flow *flow = create_flow_director(port_id, queue_id, src_ip, dst_ip, src_port, dst_port, &error);
+ if (!flow) {
+ LSTACK_LOG(ERR, LSTACK,"config_flow_director, flow can not be created. queue_id %u, src_ip %u, src_port %u, dst_port %u, dst_port_ntohs :%u, type %d. message: %s\n",
+ queue_id, src_ip,src_port,dst_port,ntohs(dst_port), error.type, error.message ? error.message : "(no stated reason)");
+ return;
+ }
+}
+
+void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
+{
+ uint16_t port_id = get_port_id();
+ (void)port_id;
+}
+
+/*
+ * delete flows
+ * if process 0, delete directly, else transfer 'dst_ip,src_port,dst_port' to process 0.
+ */
+void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
+{
+ if (get_global_cfg_params()->is_primary){
+ delete_flow_director(dst_ip, src_port, dst_port);
+ }else {
+ char process_server_path[DELETE_FLOWS_PARAMS_LENGTH];
+ sprintf_s(process_server_path, DELETE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u", dst_ip,split_delim, src_port,split_delim,dst_port);
+ int ret = transfer_pkt_to_other_process(process_server_path, 0, DELETE_FLOWS_PARAMS_LENGTH, false);
+ if(ret != 0){
+ LSTACK_LOG(ERR, LSTACK,"transfer_delete_rule_info_to_process0 error. tid %d. dst_ip %u, src_port: %u, dst_port %u\n",
+ rte_gettid(), dst_ip, src_port, dst_port);
+ }
+ }
+}
+
+/*
+ * add flows
+ * if process 0, add directly, else transfer 'src_ip,dst_ip,src_port,dst_port,queue_id' to process 0.
+ */
+void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
+{
+ char process_server_path[CREATE_FLOWS_PARAMS_LENGTH];
+ /* exchage src_ip and dst_ip, src_port and dst_port */
+ uint8_t process_idx = get_global_cfg_params()->process_idx;
+ sprintf_s(process_server_path, CREATE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u%s%u%s%u%s%u",
+ dst_ip,split_delim,src_ip,split_delim, dst_port,split_delim,src_port, split_delim,queue_id,split_delim,process_idx);
+ int ret = transfer_pkt_to_other_process(process_server_path, 0, CREATE_FLOWS_PARAMS_LENGTH, true);
+ if(ret != 0){
+ LSTACK_LOG(ERR, LSTACK,"transfer_create_rule_info_to_process0 error. tid %d. src_ip %u, dst_ip %u, src_port: %u, dst_port %u, queue_id %u, process_idx %u\n",
+ rte_gettid(), src_ip, dst_ip, src_port, dst_port, queue_id, process_idx);
+ }
+}
+
+void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add)
+{
+ char process_server_path[ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH];
+ sprintf_s(process_server_path, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, "%u%s%u%s%u", listen_port,split_delim,process_idx, split_delim, is_add);
+ int ret = transfer_pkt_to_other_process(process_server_path, 0, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, true);
+ if(ret != 0){
+ LSTACK_LOG(ERR, LSTACK,"transfer_add_or_delete_listen_port_to_process0 error. tid %d. listen_port %u, process_idx %u\n",
+ rte_gettid(), listen_port, process_idx);
+ }
+}
+
+static int str_to_array(char *args, uint32_t *array, int size)
+{
+ int val;
+ uint16_t cnt = 0;
+ char *elem = NULL;
+ char *next_token = NULL;
+
+ memset(array, 0, sizeof(*array) * size);
+ elem = strtok_s((char *)args, split_delim, &next_token);
+ while (elem != NULL) {
+ if (cnt >= size) {
+ return -1;
+ }
+ val = atoi(elem);
+ if (val < 0) {
+ return -1;
+ }
+ array[cnt] = (uint32_t)val;
+ cnt++;
+
+ elem = strtok_s(NULL, split_delim, &next_token);
+ }
+
+ return cnt;
+}
+
+void parse_and_delete_rule(char* buf)
+{
+ uint32_t array[DELETE_FLOWS_PARAMS_NUM];
+ str_to_array(buf, array, DELETE_FLOWS_PARAMS_NUM);
+ uint32_t dst_ip = array[0];
+ uint16_t src_port = array[1];
+ uint16_t dst_port = array[2];
+ delete_flow_director(dst_ip, src_port, dst_port);
+}
+
+void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type){
+ if (type == PORT_LISTEN) {
+ g_listen_ports[dst_port] = process_idx;
+ }else {
+ g_user_ports[dst_port] = process_idx;
+ }
+}
+
+void delete_user_process_port(uint16_t dst_port, enum port_type type){
+ if (type == PORT_LISTEN) {
+ g_listen_ports[dst_port] = INVAILD_PROCESS_IDX;
+ }else {
+ g_user_ports[dst_port] = INVAILD_PROCESS_IDX;
+ }
+}
+
+void parse_and_create_rule(char* buf)
+{
+ uint32_t array[CREATE_FLOWS_PARAMS_NUM];
+ str_to_array(buf, array, CREATE_FLOWS_PARAMS_NUM);
+ uint32_t src_ip = array[0];
+ uint32_t dst_ip = array[1];
+ uint16_t src_port = array[2];
+ uint16_t dst_port = array[3];
+ uint16_t queue_id = array[4];
+ uint8_t process_idx = array[5];
+ config_flow_director(queue_id, src_ip, dst_ip, src_port, dst_port);
+ add_user_process_port(dst_port, process_idx, PORT_CONNECT);
+}
+
+void parse_and_add_or_delete_listen_port(char* buf)
+{
+ uint32_t array[ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM];
+ str_to_array(buf, array, ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM);
+ uint16_t listen_port = array[0];
+ uint8_t process_idx = array[1];
+ uint8_t is_add = array[2];
+ if (is_add == 1) {
+ add_user_process_port(listen_port,process_idx, PORT_LISTEN);
+ }else {
+ delete_user_process_port(listen_port, PORT_LISTEN);
+ }
+
+}
+
+void transfer_arp_to_other_process(struct rte_mbuf *mbuf)
+{
+ struct cfg_params *cfgs = get_global_cfg_params();
+
+ for(int i = 1; i < cfgs->num_process; i++){
+ char arp_mbuf[LSTACK_MBUF_LEN] = {0};
+ sprintf_s(arp_mbuf, sizeof(arp_mbuf), "%lu", mbuf);
+ int result = transfer_pkt_to_other_process(arp_mbuf, i, LSTACK_MBUF_LEN, false);
+ if(result < 0){
+ LSTACK_LOG(ERR, LSTACK,"transfer arp pakages to process %d error. \n", i);
+ }
+ }
+}
+
+void transfer_tcp_to_thread(struct rte_mbuf *mbuf, uint16_t stk_idx)
+{
+ /* current process queue_id */
+ struct protocol_stack *stack = get_protocol_stack_group()->stacks[stk_idx];
+ int ret = -1;
+ while(ret != 0) {
+ ret = rpc_call_arp(stack, mbuf);
+ printf("transfer_tcp_to_thread, ret : %d \n", ret);
+ }
+}
+
+void parse_arp_and_transefer(char* buf)
+{
+ struct rte_mbuf *mbuf = (struct rte_mbuf *)atoll(buf);
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ struct rte_mbuf *mbuf_copy = NULL;
+ struct protocol_stack *stack = NULL;
+ int32_t ret;
+ for (int32_t i = 0; i < stack_group->stack_num; i++) {
+ stack = stack_group->stacks[i];
+ ret = gazelle_alloc_pktmbuf(stack->rxtx_pktmbuf_pool, &mbuf_copy, 1);
+ while (ret != 0) {
+ ret = gazelle_alloc_pktmbuf(stack->rxtx_pktmbuf_pool, &mbuf_copy, 1);
+ stack->stats.rx_allocmbuf_fail++;
+ }
+ copy_mbuf(mbuf_copy, mbuf);
+
+ ret = rpc_call_arp(stack, mbuf_copy);
+
+ while (ret != 0) {
+ rpc_call_arp(stack, mbuf_copy);;
+ }
+ }
+}
+
+void parse_tcp_and_transefer(char* buf)
+{
+ char *next_token = NULL;
+ char *elem = strtok_s(buf, split_delim, &next_token);
+ struct rte_mbuf *mbuf = (struct rte_mbuf *) atoll(elem);
+ elem = strtok_s(NULL, split_delim, &next_token);
+ uint16_t queue_id = atoll(elem);
+
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ uint16_t num_queue = get_global_cfg_params()->num_queue;
+ uint16_t stk_index = queue_id % num_queue;
+ struct rte_mbuf *mbuf_copy = NULL;
+ struct protocol_stack *stack = stack_group->stacks[stk_index];
+
+ int32_t ret = gazelle_alloc_pktmbuf(stack->rxtx_pktmbuf_pool, &mbuf_copy, 1);
+ while (ret != 0) {
+ ret = gazelle_alloc_pktmbuf(stack->rxtx_pktmbuf_pool, &mbuf_copy, 1);
+ stack->stats.rx_allocmbuf_fail++;
+ }
+
+ copy_mbuf(mbuf_copy,mbuf);
+
+ transfer_tcp_to_thread(mbuf_copy, stk_index);
+}
+
+int recv_pkts_from_other_process(int process_index, void* arg){
+ struct sockaddr_un serun, cliun;
+ socklen_t cliun_len;
+ int listenfd, connfd, size;
+ char buf[132];
+ /* socket */
+ if ((listenfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0)) < 0) {
+ perror("socket error");
+ return -1;
+ }
+ /* bind */
+ memset(&serun, 0, sizeof(serun));
+ serun.sun_family = AF_UNIX;
+ char process_server_path[PATH_MAX];
+ sprintf_s(process_server_path, sizeof(process_server_path), "%s%d", server_path, process_index);
+ strcpy(serun.sun_path, process_server_path);
+ size = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
+ unlink(process_server_path);
+ if (posix_api->bind_fn(listenfd, (struct sockaddr *)&serun, size) < 0) {
+ perror("bind error");
+ return -1;
+ }
+ if (posix_api->listen_fn(listenfd, 20) < 0) {
+ perror("listen error");
+ return -1;
+ }
+ sem_post((sem_t *)arg);
+ /* block */
+ while(1) {
+ cliun_len = sizeof(cliun);
+ if ((connfd = posix_api->accept_fn(listenfd, (struct sockaddr *)&cliun, &cliun_len)) < 0){
+ perror("accept error");
+ continue;
+ }
+ while(1) {
+ int n = posix_api->read_fn(connfd, buf, sizeof(buf));
+ if (n < 0) {
+ perror("read error");
+ break;
+ } else if(n == 0) {
+ break;
+ }
+
+ if(n == LSTACK_MBUF_LEN){
+ /* arp */
+ parse_arp_and_transefer(buf);
+ }else if(n == TRANSFER_TCP_MUBF_LEN) {
+ /* tcp. lstack_mbuf_queue_id */
+ printf("recv_pkts_from_other_process, process idx %d \n ", process_index);
+ parse_tcp_and_transefer(buf);
+ }else if (n == DELETE_FLOWS_PARAMS_LENGTH) {
+ /* delete rule */
+ parse_and_delete_rule(buf);
+ }else if(n == CREATE_FLOWS_PARAMS_LENGTH){
+ /* add rule */
+ parse_and_create_rule(buf);
+ char reply_buf[REPLY_LEN];
+ sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY);
+ posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
+ }else {
+ /* add port */
+ parse_and_add_or_delete_listen_port(buf);
+ char reply_buf[REPLY_LEN];
+ sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY);
+ posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
+ }
+
+ }
+ posix_api->close_fn(connfd);
+ }
+ posix_api->close_fn(listenfd);
+ return 0;
+}
+
+void concat_mbuf_and_queue_id(struct rte_mbuf *mbuf, uint16_t queue_id, char* mbuf_and_queue_id, int write_len){
+
+ sprintf_s(mbuf_and_queue_id, write_len, "%lu%s%u", mbuf,split_delim,queue_id);
+}
+
+const int32_t ipv4_version_offset = 4;
+const int32_t ipv4_version = 4;
+
+int distribute_pakages(struct rte_mbuf *mbuf)
+{
+ struct rte_ipv4_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr));
+ uint8_t ip_version = (iph->version_ihl & 0xf0) >> ipv4_version_offset;
+ if (likely(ip_version == ipv4_version)) {
+ if (likely(iph->next_proto_id == IPPROTO_TCP)) {
+ int each_process_queue_num = get_global_cfg_params()->num_queue;
+
+ struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *, sizeof(struct rte_ether_hdr) +
+ sizeof(struct rte_ipv4_hdr));
+ uint16_t dst_port = tcp_hdr->dst_port;
+
+ int user_process_idx = (g_listen_ports[dst_port] != INVAILD_PROCESS_IDX) ? g_listen_ports[dst_port] : g_user_ports[dst_port];
+
+ if (user_process_idx == INVAILD_PROCESS_IDX) {
+ return TRANSFER_KERNEL;
+ }
+ if(unlikely(tcp_hdr->tcp_flags == TCP_SYN)){
+ uint32_t src_ip = iph->src_addr;
+ uint16_t src_port = tcp_hdr->src_port;
+ uint32_t index = rte_jhash_3words(src_ip, src_port | (dst_port) << 16, 0, 0) % each_process_queue_num;
+ uint16_t queue_id = 0;
+ if (get_global_cfg_params()->seperate_send_recv) {
+ queue_id = user_process_idx * each_process_queue_num + (index/2) * 2;
+ }else {
+ queue_id = user_process_idx * each_process_queue_num + index;
+ }
+ if(queue_id != 0){
+ if(user_process_idx == 0){
+ transfer_tcp_to_thread(mbuf, queue_id);
+ }else {
+ char mbuf_and_queue_id[TRANSFER_TCP_MUBF_LEN];
+ concat_mbuf_and_queue_id(mbuf, queue_id, mbuf_and_queue_id, TRANSFER_TCP_MUBF_LEN);
+ transfer_pkt_to_other_process(mbuf_and_queue_id, user_process_idx, TRANSFER_TCP_MUBF_LEN, false);
+ }
+ return TRANSFER_OTHER_THREAD;
+ }else {
+ return TRANSFER_CURRENT_THREAD;
+ }
+ }else {
+ return TRANSFER_CURRENT_THREAD;
+ }
+ }
+ }else {
+ return TRANSFER_KERNEL;
+ }
+ return TRANSFER_KERNEL;
+}
+
+void kni_handle_rx(uint16_t port_id)
+{
+ struct rte_mbuf *pkts_burst[PACKET_READ_SIZE];
+ uint32_t nb_kni_rx = rte_kni_rx_burst(get_gazelle_kni(), pkts_burst, PACKET_READ_SIZE);
+ if (nb_kni_rx > 0) {
+ uint16_t nb_rx = rte_eth_tx_burst(port_id, 0, pkts_burst, nb_kni_rx);
+ for (uint16_t i = nb_rx; i < nb_kni_rx; ++i) {
+ rte_pktmbuf_free(pkts_burst[i]);
+ }
+ }
+ return;
+}
+
+void kni_handle_tx(struct rte_mbuf *mbuf)
+{
+ if (!get_global_cfg_params()->kni_switch) {
+ return;
+ }
+ struct rte_ipv4_hdr *ipv4_hdr;
+ uint16_t l3_offset = mbuf->l2_len;
+
+ ipv4_hdr = (struct rte_ipv4_hdr *)(rte_pktmbuf_mtod(mbuf, char*) +
+ l3_offset);
+ if (mbuf->nb_segs > 1) {
+ ipv4_hdr->hdr_checksum = 0;
+ ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
+ }
+
+ // 发送到内核协议栈
+ if (!rte_kni_tx_burst(get_gazelle_kni(), &mbuf, 1)) {
+ rte_pktmbuf_free(mbuf);
+ }
+}
+
/* optimized eth_dev_poll() in lstack */
-int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, bool use_ltran_flag, uint32_t nic_read_number)
+int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_flag, uint32_t nic_read_number)
{
uint32_t nr_pkts;
@@ -131,15 +664,33 @@ int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, bool use_ltran_flag,
}
for (uint32_t i = 0; i < nr_pkts; i++) {
+ /* 1 current thread recv; 0 other thread recv; -1 kni recv; */
+ int transfer_type = TRANSFER_CURRENT_THREAD;
/* copy arp into other stack */
if (!use_ltran_flag) {
struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(stack->pkts[i], struct rte_ether_hdr *);
if (unlikely(RTE_BE16(RTE_ETHER_TYPE_ARP) == ethh->ether_type)) {
stack_broadcast_arp(stack->pkts[i], stack);
+ if (!use_ltran_flag) {
+ // copy arp into other process
+ transfer_arp_to_other_process(stack->pkts[i]);
+ transfer_type = TRANSFER_KERNEL;
+ }
+ }else {
+ if (!use_ltran_flag && stack->queue_id == 0) {
+ transfer_type = distribute_pakages(stack->pkts[i]);
+ }
}
}
- eth_dev_recv(stack->pkts[i], stack);
+ if (likely(transfer_type == TRANSFER_CURRENT_THREAD)) {
+ eth_dev_recv(stack->pkts[i], stack);
+
+ }else if (transfer_type == TRANSFER_KERNEL) {
+ kni_handle_tx(stack->pkts[i]);
+ }else {
+ /*transfer to other thread*/
+ }
}
stack->stats.rx += nr_pkts;
diff --git a/src/lstack/netif/lstack_vdev.c b/src/lstack/netif/lstack_vdev.c
index 3d1204e..1752853 100644
--- a/src/lstack/netif/lstack_vdev.c
+++ b/src/lstack/netif/lstack_vdev.c
@@ -102,7 +102,9 @@ static uint32_t vdev_rx_poll(struct protocol_stack *stack, struct rte_mbuf **pkt
pkts[i]->packet_type = RTE_PTYPE_L3_IPV4 | RTE_PTYPE_L4_TCP;
}
- return rte_gro_reassemble_burst(pkts, pkt_num, &gro_param);
+ pkt_num = rte_gro_reassemble_burst(pkts, pkt_num, &gro_param);
+
+ return pkt_num;
}
static uint32_t ltran_tx_xmit(struct protocol_stack *stack, struct rte_mbuf **pkts, uint32_t nr_pkts)
@@ -145,14 +147,51 @@ static uint32_t vdev_tx_xmit(struct protocol_stack *stack, struct rte_mbuf **pkt
int32_t vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple)
{
- if (!use_ltran()) {
- return 0;
- }
-
if (qtuple == NULL) {
return -1;
}
+ if (!use_ltran()) {
+ if(type == REG_RING_TCP_LISTEN_CLOSE){
+ if (get_global_cfg_params()->is_primary) {
+ delete_user_process_port(qtuple->src_port, PORT_LISTEN);
+ }else{
+ transfer_add_or_delete_listen_port_to_process0(qtuple->src_port,get_global_cfg_params()->process_idx, 0);
+ }
+ }
+
+ if (type == REG_RING_TCP_CONNECT_CLOSE) {
+ if (get_global_cfg_params()->is_primary) {
+ delete_user_process_port(qtuple->src_port, PORT_CONNECT);
+ delete_flow_director(qtuple->dst_ip, qtuple->src_port, qtuple->dst_port);
+ }else{
+ transfer_delete_rule_info_to_process0(qtuple->dst_ip,qtuple->src_port,qtuple->dst_port);
+ }
+ }
+
+ if (type == REG_RING_TCP_CONNECT) {
+ uint16_t queue_id = get_protocol_stack()->queue_id;
+ if (get_global_cfg_params()->is_primary){
+ add_user_process_port(qtuple->src_port, get_global_cfg_params()->process_idx, PORT_CONNECT);
+ if (queue_id != 0) {
+ config_flow_director(queue_id, qtuple->dst_ip, qtuple->src_ip, qtuple->dst_port, qtuple->src_port);
+ }
+ }else {
+ transfer_create_rule_info_to_process0(queue_id, qtuple->src_ip, qtuple->dst_ip, qtuple->src_port, qtuple->dst_port);
+ }
+ }
+
+ if (type == REG_RING_TCP_LISTEN){
+ if (get_global_cfg_params()->is_primary) {
+ add_user_process_port(qtuple->src_port, get_global_cfg_params()->process_idx, PORT_LISTEN);
+ }else {
+ transfer_add_or_delete_listen_port_to_process0(qtuple->src_port,get_global_cfg_params()->process_idx, 1);
+ }
+ }
+ return 0;
+ }
+
+
int32_t ret;
uint32_t sent_pkts = 0;
void *free_buf[VDEV_REG_QUEUE_SZ];
--
2.23.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。