代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/gazelle 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From 7e4d7c638681df4f32c0d719c62b4e38ef69c1eb Mon Sep 17 00:00:00 2001
From: Lemmy Huang <huangliming5@huawei.com>
Date: Sat, 31 Aug 2024 14:51:41 +0800
Subject: [PATCH] cleancode: move some API from stack to rpc and rtw
Signed-off-by: Lemmy Huang <huangliming5@huawei.com>
---
src/lstack/api/lstack_epoll.c | 17 +-
src/lstack/api/lstack_rtc_api.c | 7 +-
src/lstack/api/lstack_rtw_api.c | 250 ++++++-
src/lstack/api/lstack_wrap.c | 1 -
src/lstack/core/lstack_dpdk.c | 5 +-
src/lstack/core/lstack_lwip.c | 47 +-
src/lstack/core/lstack_protocol_stack.c | 757 ++-------------------
src/lstack/core/lstack_thread_rpc.c | 610 +++++++++++++----
src/lstack/include/lstack_dpdk.h | 33 +-
src/lstack/include/lstack_epoll.h | 12 +-
src/lstack/include/lstack_lwip.h | 9 +-
src/lstack/include/lstack_protocol_stack.h | 51 +-
src/lstack/include/lstack_rpc_proc.h | 47 --
src/lstack/include/lstack_thread_rpc.h | 81 +--
src/lstack/netif/lstack_ethdev.c | 56 ++
15 files changed, 927 insertions(+), 1056 deletions(-)
delete mode 100644 src/lstack/include/lstack_rpc_proc.h
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
index 1c13076..ce3d267 100644
--- a/src/lstack/api/lstack_epoll.c
+++ b/src/lstack/api/lstack_epoll.c
@@ -19,15 +19,9 @@
#include <stdatomic.h>
#include <pthread.h>
-#include <lwip/lwipgz_sock.h>
#include <lwip/sockets.h>
-#include <lwip/lwipgz_event.h>
-#include <lwip/api.h>
-#include <lwip/tcp.h>
-#include <lwip/timeouts.h>
#include <lwip/lwipgz_posix_api.h>
-#include "lstack_ethdev.h"
#include "lstack_stack_stat.h"
#include "lstack_cfg.h"
#include "lstack_log.h"
@@ -306,6 +300,17 @@ int32_t lstack_epoll_create(int32_t flags)
return lstack_do_epoll_create(fd);
}
+static void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup)
+{
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ struct protocol_stack *stack = NULL;
+
+ for (int32_t i = 0; i < stack_group->stack_num; i++) {
+ stack = stack_group->stacks[i];
+ rpc_call_clean_epoll(&stack->rpc_queue, wakeup);
+ }
+}
+
int32_t lstack_epoll_close(int32_t fd)
{
struct lwip_sock *sock = lwip_get_socket(fd);
diff --git a/src/lstack/api/lstack_rtc_api.c b/src/lstack/api/lstack_rtc_api.c
index 7689c83..60d3b23 100644
--- a/src/lstack/api/lstack_rtc_api.c
+++ b/src/lstack/api/lstack_rtc_api.c
@@ -42,11 +42,6 @@ static int rtc_close(int s)
return lwip_close(s);
}
-static int rtc_shutdown(int fd, int how)
-{
- return lwip_shutdown(fd, how);
-}
-
static int rtc_epoll_create(int flags)
{
if (stack_setup_app_thread() < 0) {
@@ -90,7 +85,7 @@ static int rtc_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *excep
void rtc_api_init(posix_api_t *api)
{
api->close_fn = rtc_close;
- api->shutdown_fn = rtc_shutdown;
+ api->shutdown_fn = lwip_shutdown;
api->socket_fn = rtc_socket;
api->accept_fn = lwip_accept;
api->accept4_fn = lwip_accept4;
diff --git a/src/lstack/api/lstack_rtw_api.c b/src/lstack/api/lstack_rtw_api.c
index d8634cc..7ceff20 100644
--- a/src/lstack/api/lstack_rtw_api.c
+++ b/src/lstack/api/lstack_rtw_api.c
@@ -13,14 +13,258 @@
#include <lwip/lwipgz_sock.h>
#include <lwip/sockets.h>
+#include "common/gazelle_base_func.h"
+#include "lstack_log.h"
+#include "lstack_cfg.h"
#include "lstack_thread_rpc.h"
-#include "lstack_epoll.h"
#include "lstack_protocol_stack.h"
-#include "lstack_cfg.h"
#include "lstack_lwip.h"
-#include "common/gazelle_base_func.h"
+#include "lstack_epoll.h"
#include "lstack_rtw_api.h"
+/* when fd is listenfd, listenfd of all protocol stack thread will be closed */
+static int stack_broadcast_close(int fd)
+{
+ int ret = 0;
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
+ if (sock == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+
+ do {
+ sock = sock->listen_next;
+ if (stack == NULL || rpc_call_close(&stack->rpc_queue, fd)) {
+ ret = -1;
+ }
+
+ if (POSIX_IS_CLOSED(sock)) {
+ break;
+ }
+ fd = sock->conn->callback_arg.socket;
+ stack = get_protocol_stack_by_fd(fd);
+ } while (1);
+
+ return ret;
+}
+
+static int stack_broadcast_shutdown(int fd, int how)
+{
+ int32_t ret = 0;
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
+ if (sock == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+
+ do {
+ sock = sock->listen_next;
+ if (stack == NULL || rpc_call_shutdown(&stack->rpc_queue, fd, how)) {
+ ret = -1;
+ }
+
+ if (POSIX_IS_CLOSED(sock)) {
+ break;
+ }
+ fd = sock->conn->callback_arg.socket;
+ stack = get_protocol_stack_by_fd(fd);
+ } while (1);
+
+ return ret;
+}
+
+/* choice one stack bind */
+static int stack_single_bind(int fd, const struct sockaddr *name, socklen_t namelen)
+{
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
+ if (stack == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+ return rpc_call_bind(&stack->rpc_queue, fd, name, namelen);
+}
+
+/* bind sync to all protocol stack thread, so that any protocol stack thread can build connect */
+static int stack_broadcast_bind(int fd, const struct sockaddr *name, socklen_t namelen)
+{
+ struct protocol_stack *cur_stack = get_protocol_stack_by_fd(fd);
+ struct protocol_stack *stack = NULL;
+ int ret, clone_fd;
+
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ if (sock == NULL || cur_stack == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd);
+ GAZELLE_RETURN(EBADF);
+ }
+
+ ret = rpc_call_bind(&cur_stack->rpc_queue, fd, name, namelen);
+ if (ret < 0) {
+ close(fd);
+ return ret;
+ }
+
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ for (int i = 0; i < stack_group->stack_num; ++i) {
+ stack = stack_group->stacks[i];
+ if (stack != cur_stack) {
+ clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, name, namelen);
+ if (clone_fd < 0) {
+ stack_broadcast_close(fd);
+ return clone_fd;
+ }
+ }
+ }
+ return 0;
+}
+
+static void inline del_accept_in_event(struct lwip_sock *sock)
+{
+ pthread_spin_lock(&sock->wakeup->event_list_lock);
+
+ if (!NETCONN_IS_ACCEPTIN(sock)) {
+ sock->events &= ~EPOLLIN;
+ if (sock->events == 0) {
+ list_del_node(&sock->event_list);
+ }
+ }
+
+ pthread_spin_unlock(&sock->wakeup->event_list_lock);
+}
+
+static struct lwip_sock *get_min_accept_sock(int fd)
+{
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ struct lwip_sock *min_sock = NULL;
+
+ while (sock) {
+ if (!NETCONN_IS_ACCEPTIN(sock)) {
+ sock = sock->listen_next;
+ continue;
+ }
+
+ if (min_sock == NULL || min_sock->stack->conn_num > sock->stack->conn_num) {
+ min_sock = sock;
+ }
+
+ sock = sock->listen_next;
+ }
+
+ return min_sock;
+}
+
+/* ergodic the protocol stack thread to find the connection, because all threads are listening */
+static int stack_broadcast_accept4(int fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
+{
+ int ret = -1;
+ struct lwip_sock *min_sock = NULL;
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ struct protocol_stack *stack = NULL;
+ if (sock == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+
+ if (netconn_is_nonblocking(sock->conn)) {
+ min_sock = get_min_accept_sock(fd);
+ } else {
+ while ((min_sock = get_min_accept_sock(fd)) == NULL) {
+ lstack_block_wait(sock->wakeup, 0);
+ }
+ }
+
+ if (min_sock && min_sock->conn) {
+ stack = get_protocol_stack_by_fd(min_sock->conn->callback_arg.socket);
+ if (stack == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+ ret = rpc_call_accept(&stack->rpc_queue, min_sock->conn->callback_arg.socket, addr, addrlen, flags);
+ }
+
+ if (min_sock && min_sock->wakeup && min_sock->wakeup->type == WAKEUP_EPOLL) {
+ del_accept_in_event(min_sock);
+ }
+
+ if (ret < 0) {
+ errno = EAGAIN;
+ }
+ return ret;
+}
+
+static int stack_broadcast_accept(int fd, struct sockaddr *addr, socklen_t *addrlen)
+{
+ if (get_global_cfg_params()->nonblock_mode)
+ return stack_broadcast_accept4(fd, addr, addrlen, O_NONBLOCK);
+ else
+ return stack_broadcast_accept4(fd, addr, addrlen, 0);
+}
+
+/* choice one stack listen */
+static int stack_single_listen(int fd, int backlog)
+{
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
+ if (stack == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+ return rpc_call_listen(&stack->rpc_queue, fd, backlog);
+}
+
+/* listen sync to all protocol stack thread, so that any protocol stack thread can build connect */
+static int stack_broadcast_listen(int fd, int backlog)
+{
+ typedef union sockaddr_union {
+ struct sockaddr sa;
+ struct sockaddr_in in;
+ struct sockaddr_in6 in6;
+ } sockaddr_t;
+
+ struct protocol_stack *cur_stack = get_protocol_stack_by_fd(fd);
+ struct protocol_stack *stack = NULL;
+ sockaddr_t addr;
+ socklen_t addr_len = sizeof(addr);
+ int ret, clone_fd;
+
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ if (sock == NULL || cur_stack == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd);
+ GAZELLE_RETURN(EBADF);
+ }
+
+ ret = rpc_call_getsockname(&cur_stack->rpc_queue, fd, (struct sockaddr *)&addr, &addr_len);
+ if (ret != 0) {
+ return ret;
+ }
+
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ int min_conn_stk_idx = get_min_conn_stack(stack_group);
+
+ for (int32_t i = 0; i < stack_group->stack_num; ++i) {
+ stack = stack_group->stacks[i];
+ if (get_global_cfg_params()->seperate_send_recv && stack->is_send_thread) {
+ continue;
+ }
+ if (stack != cur_stack) {
+ clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, (struct sockaddr *)&addr, addr_len);
+ if (clone_fd < 0) {
+ stack_broadcast_close(fd);
+ return clone_fd;
+ }
+ } else {
+ clone_fd = fd;
+ }
+
+ if (min_conn_stk_idx == i) {
+ lwip_get_socket(clone_fd)->conn->is_master_fd = 1;
+ } else {
+ lwip_get_socket(clone_fd)->conn->is_master_fd = 0;
+ }
+
+ ret = rpc_call_listen(&stack->rpc_queue, clone_fd, backlog);
+ if (ret < 0) {
+ stack_broadcast_close(fd);
+ return ret;
+ }
+ }
+ return 0;
+}
+
static int rtw_socket(int domain, int type, int protocol)
{
struct protocol_stack *stack = get_bind_protocol_stack();
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
index 1d5529d..8f80f98 100644
--- a/src/lstack/api/lstack_wrap.c
+++ b/src/lstack/api/lstack_wrap.c
@@ -14,7 +14,6 @@
#include <ifaddrs.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
-#include <net/if.h>
#include <lwip/lwipgz_posix_api.h>
#include <lwip/lwipgz_sock.h>
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index 50fbdf6..f87e362 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -43,7 +43,6 @@
#include "lstack_log.h"
#include "common/dpdk_common.h"
#include "lstack_protocol_stack.h"
-#include "lstack_thread_rpc.h"
#include "lstack_lwip.h"
#include "lstack_cfg.h"
#include "lstack_virtio.h"
@@ -765,9 +764,9 @@ static int dpdk_bond_create(uint8_t mode, int *slave_port_id, int count)
return 0;
}
-int32_t init_dpdk_ethdev(void)
+int init_dpdk_ethdev(void)
{
- int32_t ret;
+ int ret;
int slave_port_id[GAZELLE_MAX_BOND_NUM];
int port_id = 0;
struct cfg_params *cfg = get_global_cfg_params();
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 89142a4..3454961 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -29,15 +29,12 @@
#include <lwip/prot/etharp.h>
#include "common/gazelle_base_func.h"
-#include "lstack_ethdev.h"
-#include "lstack_protocol_stack.h"
#include "lstack_log.h"
-#include "lstack_dpdk.h"
+#include "lstack_cfg.h"
+#include "lstack_protocol_stack.h"
#include "lstack_stack_stat.h"
#include "lstack_epoll.h"
-#include "lstack_thread_rpc.h"
-#include "common/dpdk_common.h"
-#include "lstack_cfg.h"
+#include "lstack_dpdk.h"
#include "lstack_lwip.h"
static const uint8_t fin_packet = 0;
@@ -841,43 +838,24 @@ ssize_t gazelle_same_node_ring_send(struct lwip_sock *sock, const void *buf, siz
return act_len;
}
-PER_THREAD uint16_t stack_sock_num[GAZELLE_MAX_STACK_NUM] = {0};
-PER_THREAD uint16_t max_sock_stack = 0;
-
-static inline void thread_bind_stack(struct lwip_sock *sock)
-{
- if (likely(sock->already_bind_numa || !sock->stack)) {
- return;
- }
- sock->already_bind_numa = 1;
-
- if (get_global_cfg_params()->app_bind_numa == 0) {
- return;
- }
-
- stack_sock_num[sock->stack->stack_idx]++;
- if (stack_sock_num[sock->stack->stack_idx] > max_sock_stack) {
- max_sock_stack = stack_sock_num[sock->stack->stack_idx];
- bind_to_stack_numa(sock->stack);
- }
-}
-
ssize_t do_lwip_send_to_stack(int32_t fd, const void *buf, size_t len, int32_t flags,
const struct sockaddr *addr, socklen_t addrlen)
{
+ struct lwip_sock *sock;
ssize_t send = 0;
-
+
if (buf == NULL) {
GAZELLE_RETURN(EINVAL);
}
-
if (addr && addr->sa_family != AF_INET && addr->sa_family != AF_INET6) {
GAZELLE_RETURN(EINVAL);
}
-
- struct lwip_sock *sock = lwip_get_socket(fd);
- thread_bind_stack(sock);
+ sock = lwip_get_socket(fd);
+ if (unlikely(sock->already_bind_numa == 0 && sock->stack)) {
+ thread_bind_stack(sock->stack);
+ sock->already_bind_numa = 1;
+ }
if (sock->same_node_tx_ring != NULL) {
return gazelle_same_node_ring_send(sock, buf, len, flags);
@@ -1131,7 +1109,10 @@ ssize_t do_lwip_read_from_stack(int32_t fd, void *buf, size_t len, int32_t flags
return -1;
}
- thread_bind_stack(sock);
+ if (unlikely(sock->already_bind_numa == 0 && sock->stack)) {
+ thread_bind_stack(sock->stack);
+ sock->already_bind_numa = 1;
+ }
if (sock->same_node_rx_ring != NULL) {
return gazelle_same_node_ring_recv(sock, buf, len, flags);
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index f56e911..bcca1a7 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -22,14 +22,12 @@
#include <lwip/lwipgz_posix_api.h>
#include "common/gazelle_base_func.h"
-#include "lstack_thread_rpc.h"
#include "common/dpdk_common.h"
#include "lstack_log.h"
+#include "lstack_cfg.h"
#include "lstack_dpdk.h"
#include "lstack_ethdev.h"
-#include "lstack_vdev.h"
#include "lstack_lwip.h"
-#include "lstack_cfg.h"
#include "lstack_control_plane.h"
#include "lstack_epoll.h"
#include "lstack_stack_stat.h"
@@ -64,18 +62,6 @@ static void stack_wait_quit(struct protocol_stack *stack)
}
}
-void bind_to_stack_numa(struct protocol_stack *stack)
-{
- int32_t ret;
- pthread_t tid = pthread_self();
-
- ret = pthread_setaffinity_np(tid, sizeof(stack->idle_cpuset), &stack->idle_cpuset);
- if (ret != 0) {
- LSTACK_LOG(ERR, LSTACK, "thread %d setaffinity to stack %hu failed\n", rte_gettid(), stack->queue_id);
- return;
- }
-}
-
static inline void set_stack_idx(uint16_t idx)
{
g_stack_p = g_stack_group.stacks[idx];
@@ -97,27 +83,6 @@ struct protocol_stack_group *get_protocol_stack_group(void)
return &g_stack_group;
}
-int get_min_conn_stack(struct protocol_stack_group *stack_group)
-{
- int min_conn_stk_idx = 0;
- int min_conn_num = GAZELLE_MAX_CLIENTS;
- for (int i = 0; i < stack_group->stack_num; i++) {
- struct protocol_stack* stack = stack_group->stacks[i];
- if (get_global_cfg_params()->seperate_send_recv) {
- if (!stack->is_send_thread && stack->conn_num < min_conn_num) {
- min_conn_stk_idx = i;
- min_conn_num = stack->conn_num;
- }
- } else {
- if (stack->conn_num < min_conn_num) {
- min_conn_stk_idx = i;
- min_conn_num = stack->conn_num;
- }
- }
- }
- return min_conn_stk_idx;
-}
-
struct protocol_stack *get_protocol_stack(void)
{
return g_stack_p;
@@ -179,6 +144,57 @@ struct protocol_stack *get_bind_protocol_stack(void)
return stack_group->stacks[index];
}
+int get_min_conn_stack(struct protocol_stack_group *stack_group)
+{
+ struct protocol_stack* stack;
+ int min_conn_stk_idx = 0;
+ int min_conn_num = GAZELLE_MAX_CLIENTS;
+
+ for (int i = 0; i < stack_group->stack_num; i++) {
+ stack = stack_group->stacks[i];
+ if (get_global_cfg_params()->seperate_send_recv) {
+ if (!stack->is_send_thread && stack->conn_num < min_conn_num) {
+ min_conn_stk_idx = i;
+ min_conn_num = stack->conn_num;
+ }
+ } else {
+ if (stack->conn_num < min_conn_num) {
+ min_conn_stk_idx = i;
+ min_conn_num = stack->conn_num;
+ }
+ }
+ }
+ return min_conn_stk_idx;
+}
+
+void bind_to_stack_numa(struct protocol_stack *stack)
+{
+ int32_t ret;
+ pthread_t tid = pthread_self();
+
+ ret = pthread_setaffinity_np(tid, sizeof(stack->idle_cpuset), &stack->idle_cpuset);
+ if (ret != 0) {
+ LSTACK_LOG(ERR, LSTACK, "thread %d setaffinity to stack %hu failed\n", rte_gettid(), stack->queue_id);
+ return;
+ }
+}
+
+void thread_bind_stack(struct protocol_stack *stack)
+{
+ static PER_THREAD uint16_t stack_sock_num[GAZELLE_MAX_STACK_NUM] = {0};
+ static PER_THREAD uint16_t max_sock_stack = 0;
+
+ if (get_global_cfg_params()->app_bind_numa == 0) {
+ return;
+ }
+
+ stack_sock_num[stack->stack_idx]++;
+ if (stack_sock_num[stack->stack_idx] > max_sock_stack) {
+ max_sock_stack = stack_sock_num[stack->stack_idx];
+ bind_to_stack_numa(stack);
+ }
+}
+
static uint32_t get_protocol_traffic(struct protocol_stack *stack)
{
if (use_ltran()) {
@@ -232,6 +248,11 @@ void low_power_idling(struct protocol_stack *stack)
}
}
+struct thread_params {
+ uint16_t queue_id;
+ uint16_t idx;
+};
+
static int32_t create_thread(void *arg, char *thread_name, stack_thread_func func)
{
/* thread may run slow, if arg is temp var maybe have relese */
@@ -373,7 +394,7 @@ static int32_t init_stack_value(struct protocol_stack *stack, void *arg)
return 0;
}
-void wait_sem_value(sem_t *sem, int32_t wait_value)
+static void wait_sem_value(sem_t *sem, int32_t wait_value)
{
int32_t sem_val;
do {
@@ -546,7 +567,7 @@ static void* gazelle_stack_thread(void *arg)
return NULL;
}
-int32_t stack_group_init_mempool(void)
+static int stack_group_init_mempool(void)
{
struct cfg_params *cfg_params = get_global_cfg_params();
uint32_t total_mbufs = 0;
@@ -702,655 +723,9 @@ OUT2:
return -1;
}
-void stack_arp(struct rpc_msg *msg)
-{
- struct rte_mbuf *mbuf = (struct rte_mbuf *)msg->args[MSG_ARG_0].p;
- struct protocol_stack *stack = get_protocol_stack();
-
- eth_dev_recv(mbuf, stack);
-}
-
-void stack_socket(struct rpc_msg *msg)
-{
- msg->result = lwip_socket(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i);
- if (msg->result < 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, %ld socket failed\n", get_stack_tid(), msg->result);
- }
-}
-
-void stack_close(struct rpc_msg *msg)
-{
- int32_t fd = msg->args[MSG_ARG_0].i;
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct lwip_sock *sock = lwip_get_socket(fd);
-
- if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) {
- msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg); /* until stack_send recall finish */
- return;
- }
-
- msg->result = lwip_close(fd);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_shutdown(struct rpc_msg *msg)
-{
- int fd = msg->args[MSG_ARG_0].i;
- int how = msg->args[MSG_ARG_1].i;
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct lwip_sock *sock = lwip_get_socket(fd);
-
- if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) {
- msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg);
- return;
- }
-
- msg->result = lwip_shutdown(fd, how);
- if (msg->result != 0 && errno != ENOTCONN) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), fd, msg->result);
- }
-
- posix_api->shutdown_fn(fd, how);
-}
-
-void stack_bind(struct rpc_msg *msg)
-{
- msg->result = lwip_bind(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].socklen);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_listen(struct rpc_msg *msg)
-{
- int32_t fd = msg->args[MSG_ARG_0].i;
- int32_t backlog = msg->args[MSG_ARG_1].i;
-
- struct lwip_sock *sock = lwip_get_socket(fd);
- if (sock == NULL) {
- msg->result = -1;
- return;
- }
-
- /* new listen add to stack listen list */
- msg->result = lwip_listen(fd, backlog);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_accept(struct rpc_msg *msg)
-{
- int32_t fd = msg->args[MSG_ARG_0].i;
- msg->result = -1;
- struct protocol_stack *stack = get_protocol_stack();
-
- int32_t accept_fd = lwip_accept4(fd, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p, msg->args[MSG_ARG_3].i);
- if (accept_fd < 0) {
- stack->stats.accept_fail++;
- LSTACK_LOG(ERR, LSTACK, "fd %d ret %d\n", fd, accept_fd);
- return;
- }
-
- struct lwip_sock *sock = lwip_get_socket(accept_fd);
- if (sock == NULL || sock->stack == NULL) {
- lwip_close(accept_fd);
- LSTACK_LOG(ERR, LSTACK, "fd %d ret %d\n", fd, accept_fd);
- return;
- }
-
- msg->result = accept_fd;
- sock->stack->conn_num++;
- if (rte_ring_count(sock->conn->recvmbox->ring)) {
- do_lwip_add_recvlist(accept_fd);
- }
-}
-
-void stack_connect(struct rpc_msg *msg)
-{
- msg->result = lwip_connect(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].socklen);
- if (msg->result < 0) {
- msg->result = -errno;
- }
-}
-
-void stack_getpeername(struct rpc_msg *msg)
-{
- msg->result = lwip_getpeername(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_getsockname(struct rpc_msg *msg)
-{
- msg->result = lwip_getsockname(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_getsockopt(struct rpc_msg *msg)
-{
- msg->result = lwip_getsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i,
- msg->args[MSG_ARG_3].p, msg->args[MSG_ARG_4].p);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(),
- msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result);
- }
-}
-
-void stack_setsockopt(struct rpc_msg *msg)
-{
- msg->result = lwip_setsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i,
- msg->args[MSG_ARG_3].cp, msg->args[MSG_ARG_4].socklen);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(),
- msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result);
- }
-}
-
-void stack_fcntl(struct rpc_msg *msg)
-{
- msg->result = lwip_fcntl(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].l);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_ioctl(struct rpc_msg *msg)
-{
- msg->result = lwip_ioctl(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].l, msg->args[MSG_ARG_2].p);
- if (msg->result != 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
- }
-}
-
-void stack_recv(struct rpc_msg *msg)
-{
- msg->result = lwip_recv(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].size,
- msg->args[MSG_ARG_3].i);
-}
-
-void stack_tcp_send(struct rpc_msg *msg)
-{
- int32_t fd = msg->args[MSG_ARG_0].i;
- size_t len = msg->args[MSG_ARG_1].size;
- struct protocol_stack *stack = get_protocol_stack();
- int replenish_again;
-
- struct lwip_sock *sock = lwip_get_socket(fd);
- if (POSIX_IS_CLOSED(sock)) {
- msg->result = -1;
- return;
- }
-
- if (get_protocol_stack_group()->latency_start) {
- calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG);
- }
-
- replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
- if (replenish_again < 0) {
- __sync_fetch_and_sub(&sock->call_num, 1);
- return;
- }
-
- if (NETCONN_IS_DATAOUT(sock) || replenish_again > 0) {
- if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1) {
- msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg);
- return;
- }
- }
-
- __sync_fetch_and_sub(&sock->call_num, 1);
- return;
-}
-
-void stack_udp_send(struct rpc_msg *msg)
-{
- int32_t fd = msg->args[MSG_ARG_0].i;
- size_t len = msg->args[MSG_ARG_1].size;
- struct protocol_stack *stack = get_protocol_stack();
- int replenish_again;
-
- struct lwip_sock *sock = lwip_get_socket(fd);
- if (POSIX_IS_CLOSED(sock)) {
- msg->result = -1;
- return;
- }
-
- if (get_protocol_stack_group()->latency_start) {
- calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG);
- }
-
- replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
- if ((replenish_again > 0) && (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1)) {
- rpc_call_replenish(&stack->rpc_queue, sock);
- return;
- }
-
- __sync_fetch_and_sub(&sock->call_num, 1);
- return;
-}
-
-/* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */
-void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack)
-{
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- struct rte_mbuf *mbuf_copy = NULL;
- struct protocol_stack *stack = NULL;
- int32_t ret;
-
- for (int32_t i = 0; i < stack_group->stack_num; i++) {
- stack = stack_group->stacks[i];
- if (cur_stack == stack) {
- continue;
- }
-
- /* stack maybe not init in app thread yet */
- if (stack == NULL || !(netif_is_up(&stack->netif))) {
- continue;
- }
-
- ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
- if (ret != 0) {
- stack->stats.rx_allocmbuf_fail++;
- return;
- }
- copy_mbuf(mbuf_copy, mbuf);
-
- ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy);
- if (ret != 0) {
- rte_pktmbuf_free(mbuf_copy);
- return;
- }
- }
-#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0)
- if (get_global_cfg_params()->kni_switch) {
- ret = dpdk_alloc_pktmbuf(cur_stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
- if (ret != 0) {
- cur_stack->stats.rx_allocmbuf_fail++;
- return;
- }
- copy_mbuf(mbuf_copy, mbuf);
- kni_handle_tx(mbuf_copy);
- }
-#endif
- if (get_global_cfg_params()->flow_bifurcation) {
- ret = dpdk_alloc_pktmbuf(cur_stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
- if (ret != 0) {
- cur_stack->stats.rx_allocmbuf_fail++;
- return;
- }
- copy_mbuf(mbuf_copy, mbuf);
- virtio_tap_process_tx(cur_stack->queue_id, mbuf_copy);
- }
- return;
-}
-
-void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup)
-{
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- struct protocol_stack *stack = NULL;
-
- for (int32_t i = 0; i < stack_group->stack_num; i++) {
- stack = stack_group->stacks[i];
- rpc_call_clean_epoll(&stack->rpc_queue, wakeup);
- }
-}
-
-void stack_clean_epoll(struct rpc_msg *msg)
-{
- struct protocol_stack *stack = get_protocol_stack();
- struct wakeup_poll *wakeup = (struct wakeup_poll *)msg->args[MSG_ARG_0].p;
-
- list_del_node(&wakeup->wakeup_list[stack->stack_idx]);
-}
-
-void stack_mempool_size(struct rpc_msg *msg)
-{
- struct protocol_stack *stack = get_protocol_stack();
-
- msg->result = rte_mempool_avail_count(stack->rxtx_mbuf_pool);
-}
-
-void stack_create_shadow_fd(struct rpc_msg *msg)
-{
- int32_t fd = msg->args[MSG_ARG_0].i;
- struct sockaddr *addr = msg->args[MSG_ARG_1].p;
- socklen_t addr_len = msg->args[MSG_ARG_2].socklen;
-
- int32_t clone_fd = 0;
- struct lwip_sock *sock = lwip_get_socket(fd);
- if (sock == NULL) {
- LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d\n", fd);
- msg->result = -1;
- return;
- }
-
- int domain = addr->sa_family;
- int type = NETCONN_IS_UDP(sock) ? SOCK_DGRAM : SOCK_STREAM;
- clone_fd = lwip_socket(domain, type, 0);
- if (clone_fd < 0) {
- LSTACK_LOG(ERR, LSTACK, "clone socket failed clone_fd=%d errno=%d\n", clone_fd, errno);
- msg->result = clone_fd;
- return;
- }
-
- struct lwip_sock *clone_sock = lwip_get_socket(clone_fd);
- if (clone_sock == NULL) {
- LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d clone_fd=%d\n", fd, clone_fd);
- msg->result = -1;
- return;
- }
-
- do_lwip_clone_sockopt(clone_sock, sock);
-
- while (sock->listen_next) {
- sock = sock->listen_next;
- }
- sock->listen_next = clone_sock;
-
- int32_t ret = lwip_bind(clone_fd, addr, addr_len);
- if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "clone bind failed clone_fd=%d errno=%d\n", clone_fd, errno);
- msg->result = ret;
- return;
- }
-
- msg->result = clone_fd;
-}
-
-void stack_replenish_sendring(struct rpc_msg *msg)
-{
- struct protocol_stack *stack = get_protocol_stack();
- struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p;
-
- msg->result = do_lwip_replenish_sendring(stack, sock);
- if (msg->result == true) {
- msg->recall_flag = 1;
- rpc_call(&stack->rpc_queue, msg);
- }
-}
-
-void stack_get_conntable(struct rpc_msg *msg)
-{
- struct gazelle_stat_lstack_conn_info *conn = (struct gazelle_stat_lstack_conn_info *)msg->args[MSG_ARG_0].p;
- uint32_t max_num = msg->args[MSG_ARG_1].u;
-
- msg->result = do_lwip_get_conntable(conn, max_num);
-}
-
-void stack_get_connnum(struct rpc_msg *msg)
-{
- msg->result = do_lwip_get_connnum();
-}
-
-void stack_recvlist_count(struct rpc_msg *msg)
-{
- struct protocol_stack *stack = get_protocol_stack();
- struct list_node *list = &stack->recv_list;
- uint32_t count = 0;
- struct list_node *node;
- struct list_node *temp;
-
- list_for_each_node(node, temp, list) {
- count++;
- }
-
- msg->result = count;
-}
-
-/* when fd is listenfd, listenfd of all protocol stack thread will be closed */
-int32_t stack_broadcast_close(int32_t fd)
-{
- int32_t ret = 0;
- struct lwip_sock *sock = lwip_get_socket(fd);
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- if (sock == NULL) {
- GAZELLE_RETURN(EBADF);
- }
-
- do {
- sock = sock->listen_next;
- if (stack == NULL || rpc_call_close(&stack->rpc_queue, fd)) {
- ret = -1;
- }
-
- if (POSIX_IS_CLOSED(sock)) {
- break;
- }
- fd = sock->conn->callback_arg.socket;
- stack = get_protocol_stack_by_fd(fd);
- } while (1);
-
- return ret;
-}
-
-int stack_broadcast_shutdown(int fd, int how)
-{
- int32_t ret = 0;
- struct lwip_sock *sock = lwip_get_socket(fd);
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- if (sock == NULL) {
- GAZELLE_RETURN(EBADF);
- }
-
- do {
- sock = sock->listen_next;
- if (stack == NULL || rpc_call_shutdown(&stack->rpc_queue, fd, how)) {
- ret = -1;
- }
-
- if (POSIX_IS_CLOSED(sock)) {
- break;
- }
- fd = sock->conn->callback_arg.socket;
- stack = get_protocol_stack_by_fd(fd);
- } while (1);
-
- return ret;
-}
-
-/* choice one stack listen */
-int32_t stack_single_listen(int32_t fd, int32_t backlog)
-{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- if (stack == NULL) {
- GAZELLE_RETURN(EBADF);
- }
- return rpc_call_listen(&stack->rpc_queue, fd, backlog);
-}
-
-/* listen sync to all protocol stack thread, so that any protocol stack thread can build connect */
-int32_t stack_broadcast_listen(int32_t fd, int32_t backlog)
-{
- typedef union sockaddr_union {
- struct sockaddr sa;
- struct sockaddr_in in;
- struct sockaddr_in6 in6;
- } sockaddr_t;
-
- struct protocol_stack *cur_stack = get_protocol_stack_by_fd(fd);
- struct protocol_stack *stack = NULL;
- sockaddr_t addr;
- socklen_t addr_len = sizeof(addr);
- int32_t ret, clone_fd;
-
- struct lwip_sock *sock = lwip_get_socket(fd);
- if (sock == NULL || cur_stack == NULL) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd);
- GAZELLE_RETURN(EBADF);
- }
-
- ret = rpc_call_getsockname(&cur_stack->rpc_queue, fd, (struct sockaddr *)&addr, &addr_len);
- if (ret != 0) {
- return ret;
- }
-
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- int min_conn_stk_idx = get_min_conn_stack(stack_group);
-
- for (int32_t i = 0; i < stack_group->stack_num; ++i) {
- stack = stack_group->stacks[i];
- if (get_global_cfg_params()->seperate_send_recv && stack->is_send_thread) {
- continue;
- }
- if (stack != cur_stack) {
- clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, (struct sockaddr *)&addr, addr_len);
- if (clone_fd < 0) {
- stack_broadcast_close(fd);
- return clone_fd;
- }
- } else {
- clone_fd = fd;
- }
-
- if (min_conn_stk_idx == i) {
- lwip_get_socket(clone_fd)->conn->is_master_fd = 1;
- } else {
- lwip_get_socket(clone_fd)->conn->is_master_fd = 0;
- }
-
- ret = rpc_call_listen(&stack->rpc_queue, clone_fd, backlog);
- if (ret < 0) {
- stack_broadcast_close(fd);
- return ret;
- }
- }
- return 0;
-}
-
-static struct lwip_sock *get_min_accept_sock(int32_t fd)
-{
- struct lwip_sock *sock = lwip_get_socket(fd);
- struct lwip_sock *min_sock = NULL;
-
- while (sock) {
- if (!NETCONN_IS_ACCEPTIN(sock)) {
- sock = sock->listen_next;
- continue;
- }
-
- if (min_sock == NULL || min_sock->stack->conn_num > sock->stack->conn_num) {
- min_sock = sock;
- }
-
- sock = sock->listen_next;
- }
-
- return min_sock;
-}
-
-static void inline del_accept_in_event(struct lwip_sock *sock)
-{
- pthread_spin_lock(&sock->wakeup->event_list_lock);
-
- if (!NETCONN_IS_ACCEPTIN(sock)) {
- sock->events &= ~EPOLLIN;
- if (sock->events == 0) {
- list_del_node(&sock->event_list);
- }
- }
-
- pthread_spin_unlock(&sock->wakeup->event_list_lock);
-}
-
-/* choice one stack bind */
-int32_t stack_single_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen)
-{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- if (stack == NULL) {
- GAZELLE_RETURN(EBADF);
- }
- return rpc_call_bind(&stack->rpc_queue, fd, name, namelen);
-}
-
-/* bind sync to all protocol stack thread, so that any protocol stack thread can build connect */
-int32_t stack_broadcast_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen)
-{
- struct protocol_stack *cur_stack = get_protocol_stack_by_fd(fd);
- struct protocol_stack *stack = NULL;
- int32_t ret, clone_fd;
-
- struct lwip_sock *sock = lwip_get_socket(fd);
- if (sock == NULL || cur_stack == NULL) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd);
- GAZELLE_RETURN(EBADF);
- }
-
- ret = rpc_call_bind(&cur_stack->rpc_queue, fd, name, namelen);
- if (ret < 0) {
- close(fd);
- return ret;
- }
-
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- for (int32_t i = 0; i < stack_group->stack_num; ++i) {
- stack = stack_group->stacks[i];
- if (stack != cur_stack) {
- clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, name, namelen);
- if (clone_fd < 0) {
- stack_broadcast_close(fd);
- return clone_fd;
- }
- }
- }
- return 0;
-}
-
-/* ergodic the protocol stack thread to find the connection, because all threads are listening */
-int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
-{
- int32_t ret = -1;
- struct lwip_sock *min_sock = NULL;
- struct lwip_sock *sock = lwip_get_socket(fd);
- struct protocol_stack *stack = NULL;
- if (sock == NULL) {
- GAZELLE_RETURN(EBADF);
- }
-
- if (netconn_is_nonblocking(sock->conn)) {
- min_sock = get_min_accept_sock(fd);
- } else {
- while ((min_sock = get_min_accept_sock(fd)) == NULL) {
- lstack_block_wait(sock->wakeup, 0);
- }
- }
-
- if (min_sock && min_sock->conn) {
- stack = get_protocol_stack_by_fd(min_sock->conn->callback_arg.socket);
- if (stack == NULL) {
- GAZELLE_RETURN(EBADF);
- }
- ret = rpc_call_accept(&stack->rpc_queue, min_sock->conn->callback_arg.socket, addr, addrlen, flags);
- }
-
- if (min_sock && min_sock->wakeup && min_sock->wakeup->type == WAKEUP_EPOLL) {
- del_accept_in_event(min_sock);
- }
-
- if (ret < 0) {
- errno = EAGAIN;
- }
- return ret;
-}
-
-int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen)
-{
- if (get_global_cfg_params()->nonblock_mode)
- return stack_broadcast_accept4(fd, addr, addrlen, O_NONBLOCK);
- else
- return stack_broadcast_accept4(fd, addr, addrlen, 0);
-}
-
-static void stack_all_fds_close(void)
+void stack_exit(void)
{
+ /* close all fd */
for (int i = 3; i < GAZELLE_MAX_CLIENTS + GAZELLE_RESERVED_CLIENTS; i++) {
struct lwip_sock *sock = lwip_get_socket(i);
if (!POSIX_IS_CLOSED(sock) && sock->stack == get_protocol_stack()) {
@@ -1359,16 +734,6 @@ static void stack_all_fds_close(void)
}
}
-static void stack_exit(void)
-{
- stack_all_fds_close();
-}
-
-void stack_exit_by_rpc(struct rpc_msg *msg)
-{
- stack_exit();
-}
-
void stack_group_exit(void)
{
int i;
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index 8ac06cb..3e9889a 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -13,22 +13,27 @@
#include <lwip/lwipgz_sock.h>
#include <rte_mempool.h>
+#include "lwip/lwipgz_posix_api.h"
+
#include "lstack_log.h"
#include "lstack_cfg.h"
#include "lstack_dpdk.h"
-#include "lstack_rpc_proc.h"
#include "lstack_stack_stat.h"
#include "lstack_protocol_stack.h"
#include "lstack_thread_rpc.h"
+#include "lstack_epoll.h"
+#include "lstack_lwip.h"
static PER_THREAD struct rpc_msg_pool *g_rpc_pool = NULL;
static struct rpc_stats g_rpc_stats;
+
struct rpc_stats *rpc_stats_get(void)
{
return &g_rpc_stats;
}
-static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool)
+__rte_always_inline
+static struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool)
{
int ret;
struct rpc_msg *msg = NULL;
@@ -42,23 +47,11 @@ static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct
static void rpc_msg_init(struct rpc_msg *msg, rpc_msg_func func, struct rpc_msg_pool *pool)
{
- msg->rpcpool = pool;
- pthread_spin_init(&msg->lock, PTHREAD_PROCESS_PRIVATE);
- msg->func = func;
- msg->sync_flag = 1;
+ msg->func = func;
+ msg->rpcpool = pool;
+ msg->sync_flag = 1;
msg->recall_flag = 0;
-}
-
-static struct rpc_msg *rpc_msg_alloc_except(rpc_msg_func func)
-{
- struct rpc_msg *msg = calloc(1, sizeof(struct rpc_msg));
- if (msg == NULL) {
- return NULL;
- }
-
- rpc_msg_init(msg, func, NULL);
-
- return msg;
+ pthread_spin_init(&msg->lock, PTHREAD_PROCESS_PRIVATE);
}
static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func)
@@ -92,9 +85,27 @@ static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func)
return msg;
}
-static inline __attribute__((always_inline)) int32_t rpc_sync_call(rpc_queue *queue, struct rpc_msg *msg)
+__rte_always_inline
+static void rpc_msg_free(struct rpc_msg *msg)
+{
+ pthread_spin_destroy(&msg->lock);
+ if (msg->rpcpool != NULL && msg->rpcpool->mempool != NULL) {
+ rte_mempool_put(msg->rpcpool->mempool, (void *)msg);
+ } else {
+ free(msg);
+ }
+}
+
+__rte_always_inline
+static void rpc_call(rpc_queue *queue, struct rpc_msg *msg)
{
- int32_t ret;
+ lockless_queue_mpsc_push(queue, &msg->queue_node);
+}
+
+__rte_always_inline
+static int rpc_sync_call(rpc_queue *queue, struct rpc_msg *msg)
+{
+ int ret;
pthread_spin_trylock(&msg->lock);
rpc_call(queue, msg);
@@ -107,12 +118,39 @@ static inline __attribute__((always_inline)) int32_t rpc_sync_call(rpc_queue *qu
return ret;
}
-int32_t rpc_msgcnt(rpc_queue *queue)
+int rpc_msgcnt(rpc_queue *queue)
{
return lockless_queue_count(queue);
}
-int rpc_poll_msg(rpc_queue *queue, uint32_t max_num)
+static struct rpc_msg *rpc_msg_alloc_except(rpc_msg_func func)
+{
+ struct rpc_msg *msg = calloc(1, sizeof(struct rpc_msg));
+ if (msg == NULL) {
+ return NULL;
+ }
+
+ rpc_msg_init(msg, func, NULL);
+ return msg;
+}
+
+static void stack_exit_by_rpc(struct rpc_msg *msg)
+{
+ stack_exit();
+}
+
+int rpc_call_stack_exit(rpc_queue *queue)
+{
+ struct rpc_msg *msg = rpc_msg_alloc_except(stack_exit_by_rpc);
+ if (msg == NULL) {
+ return -1;
+ }
+
+ rpc_call(queue, msg);
+ return 0;
+}
+
+int rpc_poll_msg(rpc_queue *queue, int max_num)
{
int force_quit = 0;
struct rpc_msg *msg = NULL;
@@ -149,101 +187,165 @@ int rpc_poll_msg(rpc_queue *queue, uint32_t max_num)
return force_quit;
}
-int32_t rpc_call_conntable(rpc_queue *queue, void *conn_table, uint32_t max_conn)
+
+static void callback_socket(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_get_conntable);
- if (msg == NULL) {
- return -1;
+ msg->result = lwip_socket(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i);
+ if (msg->result < 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, %ld socket failed\n", get_stack_tid(), msg->result);
}
-
- msg->args[MSG_ARG_0].p = conn_table;
- msg->args[MSG_ARG_1].u = max_conn;
-
- return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_connnum(rpc_queue *queue)
+static void callback_close(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_get_connnum);
- if (msg == NULL) {
- return -1;
+ int fd = msg->args[MSG_ARG_0].i;
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
+ struct lwip_sock *sock = lwip_get_socket(fd);
+
+ if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) {
+ msg->recall_flag = 1;
+ rpc_call(&stack->rpc_queue, msg); /* until stack_send recall finish */
+ return;
}
- return rpc_sync_call(queue, msg);
+ msg->result = lwip_close(fd);
+ if (msg->result != 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
+ }
}
-int32_t rpc_call_shadow_fd(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen)
+static void callback_shutdown(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_create_shadow_fd);
- if (msg == NULL) {
- return -1;
+ int fd = msg->args[MSG_ARG_0].i;
+ int how = msg->args[MSG_ARG_1].i;
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
+ struct lwip_sock *sock = lwip_get_socket(fd);
+
+ if (sock && __atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) > 0) {
+ msg->recall_flag = 1;
+ rpc_call(&stack->rpc_queue, msg);
+ return;
}
- msg->args[MSG_ARG_0].i = fd;
- msg->args[MSG_ARG_1].cp = addr;
- msg->args[MSG_ARG_2].socklen = addrlen;
+ msg->result = lwip_shutdown(fd, how);
+ if (msg->result != 0 && errno != ENOTCONN) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), fd, msg->result);
+ }
- return rpc_sync_call(queue, msg);
+ posix_api->shutdown_fn(fd, how);
}
-int32_t rpc_call_thread_regphase1(rpc_queue *queue, void *conn)
+static void callback_bind(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase1);
- if (msg == NULL) {
- return -1;
+ msg->result = lwip_bind(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].cp, msg->args[MSG_ARG_2].socklen);
+ if (msg->result != 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
}
- msg->args[MSG_ARG_0].p = conn;
- return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_thread_regphase2(rpc_queue *queue, void *conn)
+static void callback_listen(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase2);
- if (msg == NULL) {
- return -1;
+ int fd = msg->args[MSG_ARG_0].i;
+ int backlog = msg->args[MSG_ARG_1].i;
+
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ if (sock == NULL) {
+ msg->result = -1;
+ return;
+ }
+
+ /* new listen add to stack listen list */
+ msg->result = lwip_listen(fd, backlog);
+ if (msg->result != 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d failed %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
}
- msg->args[MSG_ARG_0].p = conn;
- return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_mbufpoolsize(rpc_queue *queue)
+static void callback_create_shadow_fd(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_mempool_size);
- if (msg == NULL) {
- return -1;
+ int fd = msg->args[MSG_ARG_0].i;
+ struct sockaddr *addr = msg->args[MSG_ARG_1].p;
+ socklen_t addr_len = msg->args[MSG_ARG_2].socklen;
+
+ int clone_fd = 0;
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ if (sock == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d\n", fd);
+ msg->result = -1;
+ return;
}
- return rpc_sync_call(queue, msg);
-}
+ int domain = addr->sa_family;
+ int type = NETCONN_IS_UDP(sock) ? SOCK_DGRAM : SOCK_STREAM;
+ clone_fd = lwip_socket(domain, type, 0);
+ if (clone_fd < 0) {
+ LSTACK_LOG(ERR, LSTACK, "clone socket failed clone_fd=%d errno=%d\n", clone_fd, errno);
+ msg->result = clone_fd;
+ return;
+ }
-int32_t rpc_call_recvlistcnt(rpc_queue *queue)
-{
- struct rpc_msg *msg = rpc_msg_alloc(stack_recvlist_count);
- if (msg == NULL) {
- return -1;
+ struct lwip_sock *clone_sock = lwip_get_socket(clone_fd);
+ if (clone_sock == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "get sock null fd=%d clone_fd=%d\n", fd, clone_fd);
+ msg->result = -1;
+ return;
}
- return rpc_sync_call(queue, msg);
+ do_lwip_clone_sockopt(clone_sock, sock);
+
+ while (sock->listen_next) {
+ sock = sock->listen_next;
+ }
+ sock->listen_next = clone_sock;
+
+ int ret = lwip_bind(clone_fd, addr, addr_len);
+ if (ret < 0) {
+ LSTACK_LOG(ERR, LSTACK, "clone bind failed clone_fd=%d errno=%d\n", clone_fd, errno);
+ msg->result = ret;
+ return;
+ }
+
+ msg->result = clone_fd;
}
-int32_t rpc_call_arp(rpc_queue *queue, void *mbuf)
+static void callback_accept(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_arp);
- if (msg == NULL) {
- return -1;
+ int fd = msg->args[MSG_ARG_0].i;
+ msg->result = -1;
+ struct protocol_stack *stack = get_protocol_stack();
+
+ int accept_fd = lwip_accept4(fd, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p, msg->args[MSG_ARG_3].i);
+ if (accept_fd < 0) {
+ stack->stats.accept_fail++;
+ LSTACK_LOG(ERR, LSTACK, "fd %d ret %d\n", fd, accept_fd);
+ return;
}
- msg->sync_flag = 0;
- msg->args[MSG_ARG_0].p = mbuf;
+ struct lwip_sock *sock = lwip_get_socket(accept_fd);
+ if (sock == NULL || sock->stack == NULL) {
+ lwip_close(accept_fd);
+ LSTACK_LOG(ERR, LSTACK, "fd %d ret %d\n", fd, accept_fd);
+ return;
+ }
- rpc_call(queue, msg);
+ msg->result = accept_fd;
+ sock->stack->conn_num++;
+ if (rte_ring_count(sock->conn->recvmbox->ring)) {
+ do_lwip_add_recvlist(accept_fd);
+ }
+}
- return 0;
+static void callback_connect(struct rpc_msg *msg)
+{
+ msg->result = lwip_connect(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].socklen);
+ if (msg->result < 0) {
+ msg->result = -errno;
+ }
}
-int32_t rpc_call_socket(rpc_queue *queue, int32_t domain, int32_t type, int32_t protocol)
+int rpc_call_socket(rpc_queue *queue, int domain, int type, int protocol)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_socket);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_socket);
if (msg == NULL) {
return -1;
}
@@ -255,9 +357,9 @@ int32_t rpc_call_socket(rpc_queue *queue, int32_t domain, int32_t type, int32_t
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_close(rpc_queue *queue, int fd)
+int rpc_call_close(rpc_queue *queue, int fd)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_close);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_close);
if (msg == NULL) {
return -1;
}
@@ -267,20 +369,9 @@ int32_t rpc_call_close(rpc_queue *queue, int fd)
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_stack_exit(rpc_queue *queue)
+int rpc_call_shutdown(rpc_queue *queue, int fd, int how)
{
- struct rpc_msg *msg = rpc_msg_alloc_except(stack_exit_by_rpc);
- if (msg == NULL) {
- return -1;
- }
-
- rpc_call(queue, msg);
- return 0;
-}
-
-int32_t rpc_call_shutdown(rpc_queue *queue, int fd, int how)
-{
- struct rpc_msg *msg = rpc_msg_alloc(stack_shutdown);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_shutdown);
if (msg == NULL) {
return -1;
}
@@ -291,48 +382,50 @@ int32_t rpc_call_shutdown(rpc_queue *queue, int fd, int how)
return rpc_sync_call(queue, msg);
}
-void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup)
+int rpc_call_bind(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_clean_epoll);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_bind);
if (msg == NULL) {
- return;
+ return -1;
}
- msg->args[MSG_ARG_0].p = wakeup;
+ msg->args[MSG_ARG_0].i = fd;
+ msg->args[MSG_ARG_1].cp = addr;
+ msg->args[MSG_ARG_2].socklen = addrlen;
- rpc_sync_call(queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_bind(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen)
+int rpc_call_listen(rpc_queue *queue, int s, int backlog)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_bind);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_listen);
if (msg == NULL) {
return -1;
}
- msg->args[MSG_ARG_0].i = fd;
- msg->args[MSG_ARG_1].cp = addr;
- msg->args[MSG_ARG_2].socklen = addrlen;
+ msg->args[MSG_ARG_0].i = s;
+ msg->args[MSG_ARG_1].i = backlog;
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_listen(rpc_queue *queue, int s, int backlog)
+int rpc_call_shadow_fd(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_listen);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_create_shadow_fd);
if (msg == NULL) {
return -1;
}
- msg->args[MSG_ARG_0].i = s;
- msg->args[MSG_ARG_1].i = backlog;
+ msg->args[MSG_ARG_0].i = fd;
+ msg->args[MSG_ARG_1].cp = addr;
+ msg->args[MSG_ARG_2].socklen = addrlen;
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
+int rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_accept);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_accept);
if (msg == NULL) {
return -1;
}
@@ -345,9 +438,9 @@ int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen)
+int rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_connect);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_connect);
if (msg == NULL) {
return -1;
}
@@ -356,7 +449,7 @@ int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr,
msg->args[MSG_ARG_1].cp = addr;
msg->args[MSG_ARG_2].socklen = addrlen;
- int32_t ret = rpc_sync_call(queue, msg);
+ int ret = rpc_sync_call(queue, msg);
if (ret < 0) {
errno = -ret;
return -1;
@@ -364,9 +457,45 @@ int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr,
return ret;
}
-int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen)
+static void callback_getpeername(struct rpc_msg *msg)
+{
+ msg->result = lwip_getpeername(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p);
+ if (msg->result != 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
+ }
+}
+
+static void callback_getsockname(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_getpeername);
+ msg->result = lwip_getsockname(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].p, msg->args[MSG_ARG_2].p);
+ if (msg->result != 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d fail %ld\n", get_stack_tid(), msg->args[MSG_ARG_0].i, msg->result);
+ }
+}
+
+static void callback_getsockopt(struct rpc_msg *msg)
+{
+ msg->result = lwip_getsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i,
+ msg->args[MSG_ARG_3].p, msg->args[MSG_ARG_4].p);
+ if (msg->result != 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(),
+ msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result);
+ }
+}
+
+static void callback_setsockopt(struct rpc_msg *msg)
+{
+ msg->result = lwip_setsockopt(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i,
+ msg->args[MSG_ARG_3].cp, msg->args[MSG_ARG_4].socklen);
+ if (msg->result != 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, fd %d, level %d, optname %d, fail %ld\n", get_stack_tid(),
+ msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i, msg->result);
+ }
+}
+
+int rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(callback_getpeername);
if (msg == NULL) {
return -1;
}
@@ -378,9 +507,9 @@ int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, so
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen)
+int rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_getsockname);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_getsockname);
if (msg == NULL) {
return -1;
}
@@ -392,9 +521,9 @@ int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, so
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen)
+int rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_getsockopt);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_getsockopt);
if (msg == NULL) {
return -1;
}
@@ -408,9 +537,9 @@ int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, vo
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen)
+int rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_setsockopt);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_setsockopt);
if (msg == NULL) {
return -1;
}
@@ -424,51 +553,91 @@ int32_t rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, co
return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_fcntl(rpc_queue *queue, int fd, int cmd, long val)
+static void callback_tcp_send(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_fcntl);
- if (msg == NULL) {
- return -1;
+ int fd = msg->args[MSG_ARG_0].i;
+ size_t len = msg->args[MSG_ARG_1].size;
+ struct protocol_stack *stack = get_protocol_stack();
+ int replenish_again;
+
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ if (POSIX_IS_CLOSED(sock)) {
+ msg->result = -1;
+ return;
}
- msg->args[MSG_ARG_0].i = fd;
- msg->args[MSG_ARG_1].i = cmd;
- msg->args[MSG_ARG_2].l = val;
+ if (get_protocol_stack_group()->latency_start) {
+ calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG);
+ }
- return rpc_sync_call(queue, msg);
+ replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
+ if (replenish_again < 0) {
+ __sync_fetch_and_sub(&sock->call_num, 1);
+ return;
+ }
+
+ if (NETCONN_IS_DATAOUT(sock) || replenish_again > 0) {
+ if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1) {
+ msg->recall_flag = 1;
+ rpc_call(&stack->rpc_queue, msg);
+ return;
+ }
+ }
+
+ __sync_fetch_and_sub(&sock->call_num, 1);
+ return;
}
-int32_t rpc_call_ioctl(rpc_queue *queue, int fd, long cmd, void *argp)
+static void callback_udp_send(struct rpc_msg *msg)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_ioctl);
- if (msg == NULL) {
- return -1;
+ int fd = msg->args[MSG_ARG_0].i;
+ size_t len = msg->args[MSG_ARG_1].size;
+ struct protocol_stack *stack = get_protocol_stack();
+ int replenish_again;
+
+ struct lwip_sock *sock = lwip_get_socket(fd);
+ if (POSIX_IS_CLOSED(sock)) {
+ msg->result = -1;
+ return;
}
- msg->args[MSG_ARG_0].i = fd;
- msg->args[MSG_ARG_1].l = cmd;
- msg->args[MSG_ARG_2].p = argp;
+ if (get_protocol_stack_group()->latency_start) {
+ calculate_sock_latency(&stack->latency, sock, GAZELLE_LATENCY_WRITE_RPC_MSG);
+ }
- return rpc_sync_call(queue, msg);
+ replenish_again = do_lwip_send(stack, sock->conn->callback_arg.socket, sock, len, 0);
+ if ((replenish_again > 0) && (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 1)) {
+ rpc_call_replenish(&stack->rpc_queue, sock);
+ return;
+ }
+
+ __sync_fetch_and_sub(&sock->call_num, 1);
+ return;
}
-int32_t rpc_call_replenish(rpc_queue *queue, void *sock)
+int rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_replenish_sendring);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_udp_send);
if (msg == NULL) {
return -1;
}
- msg->args[MSG_ARG_0].p = sock;
+ if (get_protocol_stack_group()->latency_start) {
+ time_stamp_into_rpcmsg(lwip_get_socket(fd));
+ }
+
+ msg->args[MSG_ARG_0].i = fd;
+ msg->args[MSG_ARG_1].size = len;
+ msg->args[MSG_ARG_2].i = flags;
msg->sync_flag = 0;
rpc_call(queue, msg);
return 0;
}
-int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags)
+int rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_tcp_send);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_tcp_send);
if (msg == NULL) {
return -1;
}
@@ -483,28 +652,173 @@ int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags)
msg->sync_flag = 0;
rpc_call(queue, msg);
+ return 0;
+}
+
+static void callback_replenish_sendring(struct rpc_msg *msg)
+{
+ struct protocol_stack *stack = get_protocol_stack();
+ struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p;
+
+ msg->result = do_lwip_replenish_sendring(stack, sock);
+ if (msg->result == true) {
+ msg->recall_flag = 1;
+ rpc_call(&stack->rpc_queue, msg);
+ }
+}
+
+int rpc_call_replenish(rpc_queue *queue, void *sock)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(callback_replenish_sendring);
+ if (msg == NULL) {
+ return -1;
+ }
+ msg->args[MSG_ARG_0].p = sock;
+ msg->sync_flag = 0;
+
+ rpc_call(queue, msg);
return 0;
}
-int32_t rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags)
+static void callback_recvlist_count(struct rpc_msg *msg)
+{
+ struct protocol_stack *stack = get_protocol_stack();
+ struct list_node *list = &stack->recv_list;
+ int count = 0;
+ struct list_node *node;
+ struct list_node *temp;
+
+ list_for_each_node(node, temp, list) {
+ count++;
+ }
+ msg->result = count;
+}
+
+int rpc_call_recvlistcnt(rpc_queue *queue)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack_udp_send);
+ struct rpc_msg *msg = rpc_msg_alloc(callback_recvlist_count);
if (msg == NULL) {
return -1;
}
- if (get_protocol_stack_group()->latency_start) {
- time_stamp_into_rpcmsg(lwip_get_socket(fd));
+ return rpc_sync_call(queue, msg);
+}
+
+static void callback_clean_epoll(struct rpc_msg *msg)
+{
+ struct protocol_stack *stack = get_protocol_stack();
+ struct wakeup_poll *wakeup = (struct wakeup_poll *)msg->args[MSG_ARG_0].p;
+
+ list_del_node(&wakeup->wakeup_list[stack->stack_idx]);
+}
+
+void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(callback_clean_epoll);
+ if (msg == NULL) {
+ return;
+ }
+
+ msg->args[MSG_ARG_0].p = wakeup;
+
+ rpc_sync_call(queue, msg);
+}
+
+static void callback_arp(struct rpc_msg *msg)
+{
+ struct rte_mbuf *mbuf = (struct rte_mbuf *)msg->args[MSG_ARG_0].p;
+ struct protocol_stack *stack = get_protocol_stack();
+
+ eth_dev_recv(mbuf, stack);
+}
+
+int rpc_call_arp(rpc_queue *queue, void *mbuf)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(callback_arp);
+ if (msg == NULL) {
+ return -1;
}
- msg->args[MSG_ARG_0].i = fd;
- msg->args[MSG_ARG_1].size = len;
- msg->args[MSG_ARG_2].i = flags;
msg->sync_flag = 0;
+ msg->args[MSG_ARG_0].p = mbuf;
rpc_call(queue, msg);
return 0;
}
+static void callback_mempool_size(struct rpc_msg *msg)
+{
+ struct protocol_stack *stack = get_protocol_stack();
+
+ msg->result = rte_mempool_avail_count(stack->rxtx_mbuf_pool);
+}
+
+static void callback_get_conntable(struct rpc_msg *msg)
+{
+ struct gazelle_stat_lstack_conn_info *conn = (struct gazelle_stat_lstack_conn_info *)msg->args[MSG_ARG_0].p;
+ unsigned max_num = msg->args[MSG_ARG_1].u;
+
+ msg->result = do_lwip_get_conntable(conn, max_num);
+}
+
+static void callback_get_connnum(struct rpc_msg *msg)
+{
+ msg->result = do_lwip_get_connnum();
+}
+
+int rpc_call_conntable(rpc_queue *queue, void *conn_table, unsigned max_conn)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(callback_get_conntable);
+ if (msg == NULL) {
+ return -1;
+ }
+
+ msg->args[MSG_ARG_0].p = conn_table;
+ msg->args[MSG_ARG_1].u = max_conn;
+
+ return rpc_sync_call(queue, msg);
+}
+
+int rpc_call_connnum(rpc_queue *queue)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(callback_get_connnum);
+ if (msg == NULL) {
+ return -1;
+ }
+
+ return rpc_sync_call(queue, msg);
+}
+
+int rpc_call_mbufpoolsize(rpc_queue *queue)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(callback_mempool_size);
+ if (msg == NULL) {
+ return -1;
+ }
+
+ return rpc_sync_call(queue, msg);
+}
+
+extern void thread_register_phase1(struct rpc_msg *msg);
+int rpc_call_thread_regphase1(rpc_queue *queue, void *conn)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase1);
+ if (msg == NULL) {
+ return -1;
+ }
+ msg->args[MSG_ARG_0].p = conn;
+ return rpc_sync_call(queue, msg);
+}
+
+extern void thread_register_phase2(struct rpc_msg *msg);
+int rpc_call_thread_regphase2(rpc_queue *queue, void *conn)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase2);
+ if (msg == NULL) {
+ return -1;
+ }
+ msg->args[MSG_ARG_0].p = conn;
+ return rpc_sync_call(queue, msg);
+}
diff --git a/src/lstack/include/lstack_dpdk.h b/src/lstack/include/lstack_dpdk.h
index d058409..965a0cb 100644
--- a/src/lstack/include/lstack_dpdk.h
+++ b/src/lstack/include/lstack_dpdk.h
@@ -13,7 +13,10 @@
#ifndef _GAZELLE_DPDK_H_
#define _GAZELLE_DPDK_H_
-#include <lwip/lwipgz_flow.h>
+#include <rte_ring.h>
+#include <rte_mbuf.h>
+#include <rte_mempool.h>
+
#include "common/gazelle_opt.h"
#include "common/gazelle_dfx_msg.h"
@@ -32,32 +35,34 @@
*/
#define MBUF_MAX_NUM 0xfffffff
+struct protocol_stack;
+
+int32_t dpdk_eal_init(void);
+void lstack_log_level_init(void);
+
+int dpdk_ethdev_init(int port_id);
+int dpdk_ethdev_start(void);
+int init_dpdk_ethdev(void);
+
int thread_affinity_default(void);
int thread_affinity_init(int cpu_id);
-struct protocol_stack;
-struct rte_mempool;
-struct rte_ring;
-struct rte_mbuf;
+int32_t create_shared_ring(struct protocol_stack *stack);
int32_t fill_mbuf_to_ring(struct rte_mempool *mempool, struct rte_ring *ring, uint32_t mbuf_num);
-int32_t dpdk_eal_init(void);
int32_t pktmbuf_pool_init(struct protocol_stack *stack);
struct rte_mempool *create_mempool(const char *name, uint32_t count, uint32_t size,
uint32_t flags, int32_t idx);
-int32_t create_shared_ring(struct protocol_stack *stack);
-void lstack_log_level_init(void);
-int dpdk_ethdev_init(int port_id);
-int dpdk_ethdev_start(void);
+struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf,
+ uint32_t mbuf_cache_size, uint16_t queue_id, unsigned numa_id);
+int32_t dpdk_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num, bool reserve);
+
#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0)
void dpdk_skip_nic_init(void);
void dpdk_restore_pci(void);
#endif
int32_t dpdk_init_lstack_kni(void);
-bool port_in_stack_queue(gz_addr_t *src_ip, gz_addr_t *dst_ip, uint16_t src_port, uint16_t dst_port);
-struct rte_mempool *create_pktmbuf_mempool(const char *name, uint32_t nb_mbuf,
- uint32_t mbuf_cache_size, uint16_t queue_id, unsigned numa_id);
void dpdk_nic_xstats_get(struct gazelle_stack_dfx_data *dfx, uint16_t port_id);
-int32_t dpdk_alloc_pktmbuf(struct rte_mempool *pool, struct rte_mbuf **mbufs, uint32_t num, bool reserve);
void dpdk_nic_features_get(struct gazelle_stack_dfx_data *dfx, uint16_t port_id);
+
#endif /* GAZELLE_DPDK_H */
diff --git a/src/lstack/include/lstack_epoll.h b/src/lstack/include/lstack_epoll.h
index 6e02615..e7ae26b 100644
--- a/src/lstack/include/lstack_epoll.h
+++ b/src/lstack/include/lstack_epoll.h
@@ -19,14 +19,11 @@
#include <pthread.h>
#include <lwip/lwipgz_list.h>
+#include <lwip/lwipgz_sock.h>
#include "common/gazelle_dfx_msg.h"
#include "common/gazelle_opt.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
enum wakeup_type {
WAKEUP_EPOLL = 0,
WAKEUP_POLL,
@@ -61,9 +58,6 @@ struct wakeup_poll {
pthread_spinlock_t event_list_lock;
};
-struct netconn;
-struct lwip_sock;
-
void add_sock_event(struct lwip_sock *sock, uint32_t event);
void add_sock_event_nolock(struct lwip_sock *sock, uint32_t event);
void del_sock_event(struct lwip_sock *sock, uint32_t event);
@@ -91,8 +85,4 @@ static inline void lstack_block_wakeup(struct wakeup_poll *wakeup)
}
}
-#ifdef __cplusplus
-}
-#endif
-
#endif /* _GAZELLE_EPOLL_H_ */
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index b972f11..0c7bb62 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -15,8 +15,12 @@
#include <stdbool.h>
#include "common/gazelle_dfx_msg.h"
+#include "common/dpdk_common.h"
struct lwip_sock;
+struct rpc_msg;
+struct protocol_stack;
+
unsigned same_node_ring_count(struct lwip_sock *sock);
#define NETCONN_IS_ACCEPTIN(sock) (((sock)->conn->acceptmbox != NULL) && !sys_mbox_empty((sock)->conn->acceptmbox))
@@ -25,11 +29,6 @@ unsigned same_node_ring_count(struct lwip_sock *sock);
#define NETCONN_IS_OUTIDLE(sock) gazelle_ring_readable_count((sock)->send_ring)
#define NETCONN_IS_UDP(sock) (NETCONNTYPE_GROUP(netconn_type((sock)->conn)) == NETCONN_UDP)
-struct rte_mempool;
-struct rpc_msg;
-struct rte_mbuf;
-struct protocol_stack;
-
void do_lwip_clone_sockopt(struct lwip_sock *dst_sock, struct lwip_sock *src_sock);
struct pbuf *do_lwip_tcp_get_from_sendring(struct lwip_sock *sock, uint16_t remain_size);
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index 7dce757..fdd5388 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -17,6 +17,10 @@
#include <sys/epoll.h>
#include <stdbool.h>
+#include <rte_ring.h>
+#include <rte_mbuf.h>
+#include <rte_mempool.h>
+
#include <lwip/lwipgz_list.h>
#include <lwip/netif.h>
@@ -35,10 +39,6 @@
#define MBUFPOOL_RESERVE_NUM (get_global_cfg_params()->nic.rxqueue_size + 1024)
-struct rte_mempool;
-struct rte_ring;
-struct rte_mbuf;
-
struct protocol_stack {
uint32_t tid;
uint16_t queue_id;
@@ -111,50 +111,23 @@ struct protocol_stack_group {
};
long get_stack_tid(void);
+
struct protocol_stack *get_protocol_stack(void);
struct protocol_stack *get_protocol_stack_by_fd(int32_t fd);
struct protocol_stack *get_bind_protocol_stack(void);
struct protocol_stack_group *get_protocol_stack_group(void);
+int get_min_conn_stack(struct protocol_stack_group *stack_group);
+void bind_to_stack_numa(struct protocol_stack *stack);
+void thread_bind_stack(struct protocol_stack *stack);
+
int32_t stack_group_init(void);
void stack_group_exit(void);
+void stack_exit(void);
+
int32_t stack_setup_thread(void);
int32_t stack_setup_app_thread(void);
-void bind_to_stack_numa(struct protocol_stack *stack);
-int32_t init_dpdk_ethdev(void);
-
-void wait_sem_value(sem_t *sem, int32_t wait_value);
-
-/* any protocol stack thread receives arp packet and sync it to other threads so that it can have the arp table */
-void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack);
-
-/* when fd is listenfd, listenfd of all protocol stack thread will be closed */
-int32_t stack_broadcast_close(int32_t fd);
-
-int stack_broadcast_shutdown(int fd, int how);
-
-/* listen sync to all protocol stack thread, so that any protocol stack thread can build connect */
-int32_t stack_broadcast_listen(int32_t fd, int backlog);
-int32_t stack_single_listen(int32_t fd, int32_t backlog);
-
-/* bind sync to all protocol stack thread, only for udp protocol */
-int32_t stack_broadcast_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen);
-int32_t stack_single_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen);
-
-/* ergodic the protocol stack thread to find the connection, because all threads are listening */
-int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen);
-int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *addrlen, int32_t flags);
-
-struct wakeup_poll;
-void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup);
-
-void stack_send_pkts(struct protocol_stack *stack);
-
-struct thread_params {
- uint16_t queue_id;
- uint16_t idx;
-};
-
int stack_polling(uint32_t wakeup_tick);
+
#endif
diff --git a/src/lstack/include/lstack_rpc_proc.h b/src/lstack/include/lstack_rpc_proc.h
deleted file mode 100644
index 77b18bd..0000000
--- a/src/lstack/include/lstack_rpc_proc.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
-* Copyright (c) Huawei Technologies Co., Ltd. 2020-2021. All rights reserved.
-* gazelle is licensed under the Mulan PSL v2.
-* You can use this software according to the terms and conditions of the Mulan PSL v2.
-* You may obtain a copy of Mulan PSL v2 at:
-* http://license.coscl.org.cn/MulanPSL2
-* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
-* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
-* PURPOSE.
-* See the Mulan PSL v2 for more details.
-*/
-
-#ifndef __GAZELLE_RPC_PROC_H__
-#define __GAZELLE_RPC_PROC_H__
-#include "lstack_thread_rpc.h"
-
-void stack_clean_epoll(struct rpc_msg *msg);
-void stack_arp(struct rpc_msg *msg);
-void stack_socket(struct rpc_msg *msg);
-void stack_close(struct rpc_msg *msg);
-void stack_shutdown(struct rpc_msg *msg);
-void stack_bind(struct rpc_msg *msg);
-void stack_listen(struct rpc_msg *msg);
-void stack_accept(struct rpc_msg *msg);
-void stack_connect(struct rpc_msg *msg);
-void stack_recv(struct rpc_msg *msg);
-void stack_getpeername(struct rpc_msg *msg);
-void stack_getsockname(struct rpc_msg *msg);
-void stack_getsockopt(struct rpc_msg *msg);
-void stack_setsockopt(struct rpc_msg *msg);
-void stack_fcntl(struct rpc_msg *msg);
-void stack_ioctl(struct rpc_msg *msg);
-void stack_tcp_send(struct rpc_msg *msg);
-void stack_udp_send(struct rpc_msg *msg);
-void stack_mempool_size(struct rpc_msg *msg);
-void stack_rpcpool_size(struct rpc_msg *msg);
-void stack_create_shadow_fd(struct rpc_msg *msg);
-void stack_replenish_sendring(struct rpc_msg *msg);
-void stack_get_conntable(struct rpc_msg *msg);
-void stack_get_connnum(struct rpc_msg *msg);
-void stack_recvlist_count(struct rpc_msg *msg);
-void stack_exit_by_rpc(struct rpc_msg *msg);
-
-void thread_register_phase1(struct rpc_msg *msg);
-void thread_register_phase2(struct rpc_msg *msg);
-
-#endif
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index d268366..c2654bb 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -35,8 +35,8 @@ struct rpc_stats {
struct rpc_msg;
typedef void (*rpc_msg_func)(struct rpc_msg *msg);
union rpc_msg_arg {
- int32_t i;
- uint32_t u;
+ int i;
+ unsigned int u;
long l;
unsigned long ul;
void *p;
@@ -63,50 +63,43 @@ static inline void rpc_queue_init(rpc_queue *queue)
{
lockless_queue_init(queue);
}
-
struct rpc_stats *rpc_stats_get(void);
-int32_t rpc_msgcnt(rpc_queue *queue);
-int rpc_poll_msg(rpc_queue *queue, uint32_t max_num);
+int rpc_msgcnt(rpc_queue *queue);
+int rpc_poll_msg(rpc_queue *queue, int max_num);
+
+int rpc_call_stack_exit(rpc_queue *queue);
+
+/* #include <sys/socket.h> will conflict with lwip/sockets.h */
+struct sockaddr;
+
+int rpc_call_close(rpc_queue *queue, int fd);
+int rpc_call_shutdown(rpc_queue *queue, int fd, int how);
+int rpc_call_socket(rpc_queue *queue, int domain, int type, int protocol);
+int rpc_call_bind(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen);
+int rpc_call_listen(rpc_queue *queue, int s, int backlog);
+int rpc_call_shadow_fd(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen);
+int rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);
+int rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen);
+
+int rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen);
+int rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen);
+int rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen);
+int rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen);
+
+int rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags);
+int rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags);
+
+int rpc_call_replenish(rpc_queue *queue, void *sock);
+int rpc_call_recvlistcnt(rpc_queue *queue);
+
void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup);
-int32_t rpc_call_shadow_fd(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
-int32_t rpc_call_recvlistcnt(rpc_queue *queue);
-int32_t rpc_call_thread_regphase1(rpc_queue *queue, void *conn);
-int32_t rpc_call_thread_regphase2(rpc_queue *queue, void *conn);
-int32_t rpc_call_conntable(rpc_queue *queue, void *conn_table, uint32_t max_conn);
-int32_t rpc_call_connnum(rpc_queue *queue);
-int32_t rpc_call_arp(rpc_queue *queue, void *mbuf);
-int32_t rpc_call_socket(rpc_queue *queue, int32_t domain, int32_t type, int32_t protocol);
-int32_t rpc_call_close(rpc_queue *queue, int32_t fd);
-int32_t rpc_call_shutdown(rpc_queue *queue, int fd, int how);
-int32_t rpc_call_bind(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
-int32_t rpc_call_listen(rpc_queue *queue, int s, int backlog);
-int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);
-int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen);
-int32_t rpc_call_tcp_send(rpc_queue *queue, int fd, size_t len, int flags);
-int32_t rpc_call_udp_send(rpc_queue *queue, int fd, size_t len, int flags);
-int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen);
-int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen);
-int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen);
-int32_t rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen);
-int32_t rpc_call_fcntl(rpc_queue *queue, int fd, int cmd, long val);
-int32_t rpc_call_ioctl(rpc_queue *queue, int fd, long cmd, void *argp);
-int32_t rpc_call_replenish(rpc_queue *queue, void *sock);
-int32_t rpc_call_mbufpoolsize(rpc_queue *queue);
-int32_t rpc_call_stack_exit(rpc_queue *queue);
-
-static inline __attribute__((always_inline)) void rpc_call(rpc_queue *queue, struct rpc_msg *msg)
-{
- lockless_queue_mpsc_push(queue, &msg->queue_node);
-}
+int rpc_call_arp(rpc_queue *queue, void *mbuf);
-static inline __attribute__((always_inline)) void rpc_msg_free(struct rpc_msg *msg)
-{
- pthread_spin_destroy(&msg->lock);
- if (msg->rpcpool != NULL && msg->rpcpool->mempool != NULL) {
- rte_mempool_put(msg->rpcpool->mempool, (void *)msg);
- } else {
- free(msg);
- }
-}
+int rpc_call_conntable(rpc_queue *queue, void *conn_table, unsigned max_conn);
+int rpc_call_connnum(rpc_queue *queue);
+int rpc_call_mbufpoolsize(rpc_queue *queue);
+
+int rpc_call_thread_regphase1(rpc_queue *queue, void *conn);
+int rpc_call_thread_regphase2(rpc_queue *queue, void *conn);
#endif
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index cf66e15..4f3cbc1 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -43,6 +43,62 @@
#define MBUF_MAX_LEN 1514
#define PACKET_READ_SIZE 32
+/* any protocol stack thread receives arp packet and sync it to other threads,
+ * so that it can have the arp table */
+static void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack)
+{
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ struct rte_mbuf *mbuf_copy = NULL;
+ struct protocol_stack *stack = NULL;
+ int32_t ret;
+
+ for (int32_t i = 0; i < stack_group->stack_num; i++) {
+ stack = stack_group->stacks[i];
+ if (cur_stack == stack) {
+ continue;
+ }
+
+ /* stack maybe not init in app thread yet */
+ if (stack == NULL || !(netif_is_up(&stack->netif))) {
+ continue;
+ }
+
+ ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
+ if (ret != 0) {
+ stack->stats.rx_allocmbuf_fail++;
+ return;
+ }
+ copy_mbuf(mbuf_copy, mbuf);
+
+ ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy);
+ if (ret != 0) {
+ rte_pktmbuf_free(mbuf_copy);
+ return;
+ }
+ }
+#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0)
+ if (get_global_cfg_params()->kni_switch) {
+ ret = dpdk_alloc_pktmbuf(cur_stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
+ if (ret != 0) {
+ cur_stack->stats.rx_allocmbuf_fail++;
+ return;
+ }
+ copy_mbuf(mbuf_copy, mbuf);
+ kni_handle_tx(mbuf_copy);
+ }
+#endif
+ if (get_global_cfg_params()->flow_bifurcation) {
+ ret = dpdk_alloc_pktmbuf(cur_stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
+ if (ret != 0) {
+ cur_stack->stats.rx_allocmbuf_fail++;
+ return;
+ }
+ copy_mbuf(mbuf_copy, mbuf);
+ virtio_tap_process_tx(cur_stack->queue_id, mbuf_copy);
+ }
+ return;
+}
+
void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
{
int32_t ret;
--
2.33.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。