代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/gazelle 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From fe39b43f897be7d29f9b51e79d51395e43b83e23 Mon Sep 17 00:00:00 2001
From: jiangheng <jiangheng14@huawei.com>
Date: Sun, 4 Feb 2024 19:46:17 +0800
Subject: [PATCH] rpc function does not depend on protocol stack diff rpc queue
and dfx rpc queue
---
src/common/gazelle_dfx_msg.h | 1 -
src/lstack/api/lstack_rtw_api.c | 36 ++-
src/lstack/core/lstack_control_plane.c | 10 +-
src/lstack/core/lstack_dpdk.c | 4 +-
src/lstack/core/lstack_lwip.c | 2 +-
src/lstack/core/lstack_protocol_stack.c | 92 ++++----
src/lstack/core/lstack_stack_stat.c | 18 +-
src/lstack/core/lstack_thread_rpc.c | 241 ++++++++-------------
src/lstack/include/lstack_control_plane.h | 3 -
src/lstack/include/lstack_protocol_stack.h | 36 +--
src/lstack/include/lstack_rpc_proc.h | 46 ++++
src/lstack/include/lstack_thread_rpc.h | 79 +++----
src/lstack/netif/lstack_ethdev.c | 6 +-
src/ltran/ltran_dfx.c | 3 +-
14 files changed, 292 insertions(+), 285 deletions(-)
create mode 100644 src/lstack/include/lstack_rpc_proc.h
diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h
index 1ca210b..a91a30f 100644
--- a/src/common/gazelle_dfx_msg.h
+++ b/src/common/gazelle_dfx_msg.h
@@ -101,7 +101,6 @@ struct gazelle_stack_aggregate_stats {
struct gazelle_stat_pkts {
uint16_t conn_num;
uint32_t mbufpool_avail_cnt;
- uint32_t rpcpool_avail_cnt;
uint64_t call_msg_cnt;
uint64_t recv_list_cnt;
uint64_t call_alloc_fail;
diff --git a/src/lstack/api/lstack_rtw_api.c b/src/lstack/api/lstack_rtw_api.c
index 10bc613..8498b8e 100644
--- a/src/lstack/api/lstack_rtw_api.c
+++ b/src/lstack/api/lstack_rtw_api.c
@@ -28,7 +28,11 @@
int rtw_socket(int domain, int type, int protocol)
{
- return rpc_call_socket(domain, type, protocol);
+ struct protocol_stack *stack = get_bind_protocol_stack();
+ if (stack == NULL) {
+ GAZELLE_RETURN(EINVAL);
+ }
+ return rpc_call_socket(&stack->rpc_queue, domain, type, protocol);
}
int rtw_accept(int s, struct sockaddr *addr, socklen_t *addrlen)
@@ -64,27 +68,47 @@ int rtw_listen(int s, int backlog)
int rtw_connect(int s, const struct sockaddr *name, socklen_t namelen)
{
- return rpc_call_connect(s, name, namelen);
+ struct protocol_stack *stack = get_protocol_stack_by_fd(s);
+ if (stack == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+ return rpc_call_connect(&stack->rpc_queue, s, name, namelen);
}
int rtw_setsockopt(int s, int level, int optname, const void *optval, socklen_t optlen)
{
- return rpc_call_setsockopt(s, level, optname, optval, optlen);
+ struct protocol_stack *stack = get_protocol_stack_by_fd(s);
+ if (stack == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+ return rpc_call_setsockopt(&stack->rpc_queue, s, level, optname, optval, optlen);
}
int rtw_getsockopt(int s, int level, int optname, void *optval, socklen_t *optlen)
{
- return rpc_call_getsockopt(s, level, optname, optval, optlen);
+ struct protocol_stack *stack = get_protocol_stack_by_fd(s);
+ if (stack == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+ return rpc_call_getsockopt(&stack->rpc_queue, s, level, optname, optval, optlen);
}
int rtw_getpeername(int s, struct sockaddr *name, socklen_t *namelen)
{
- return rpc_call_getpeername(s, name, namelen);
+ struct protocol_stack *stack = get_protocol_stack_by_fd(s);
+ if (stack == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+ return rpc_call_getpeername(&stack->rpc_queue, s, name, namelen);
}
int rtw_getsockname(int s, struct sockaddr *name, socklen_t *namelen)
{
- return rpc_call_getsockname(s, name, namelen);
+ struct protocol_stack *stack = get_protocol_stack_by_fd(s);
+ if (stack == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+ return rpc_call_getsockname(&stack->rpc_queue, s, name, namelen);
}
ssize_t rtw_read(int s, void *mem, size_t len)
diff --git a/src/lstack/core/lstack_control_plane.c b/src/lstack/core/lstack_control_plane.c
index a9a3814..025291d 100644
--- a/src/lstack/core/lstack_control_plane.c
+++ b/src/lstack/core/lstack_control_plane.c
@@ -611,9 +611,10 @@ static int32_t thread_register(void)
/* register all connected conn before listen conn, avoid creating new conn */
struct protocol_stack_group *stack_group = get_protocol_stack_group();
for (int32_t i = 0; i < stack_group->stack_num; i++) {
- conn->conn_num = rpc_call_conntable(stack_group->stacks[i], conn->conn_list, GAZELLE_LSTACK_MAX_CONN);
+ conn->conn_num = rpc_call_conntable(&stack_group->stacks[i]->rpc_queue,
+ conn->conn_list, GAZELLE_LSTACK_MAX_CONN);
- ret = rpc_call_thread_regphase1(stack_group->stacks[i], conn);
+ ret = rpc_call_thread_regphase1(&stack_group->stacks[i]->rpc_queue, conn);
if (ret != 0) {
LSTACK_LOG(ERR, LSTACK, "thread_register_phase1 failed ret=%d!\n", ret);
free(conn);
@@ -622,9 +623,10 @@ static int32_t thread_register(void)
}
for (int32_t i = 0; i < stack_group->stack_num; i++) {
- conn->conn_num = rpc_call_conntable(stack_group->stacks[i], conn->conn_list, GAZELLE_LSTACK_MAX_CONN);
+ conn->conn_num = rpc_call_conntable(&stack_group->stacks[i]->rpc_queue,
+ conn->conn_list, GAZELLE_LSTACK_MAX_CONN);
- ret = rpc_call_thread_regphase2(stack_group->stacks[i], conn);
+ ret = rpc_call_thread_regphase2(&stack_group->stacks[i]->rpc_queue, conn);
if (ret != 0) {
LSTACK_LOG(ERR, LSTACK, "thread_register_phase2 failed ret=%d!\n", ret);
free(conn);
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index e352850..985f1a5 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -45,7 +45,6 @@
#include "lstack_log.h"
#include "dpdk_common.h"
-#include "lstack_lockless_queue.h"
#include "lstack_protocol_stack.h"
#include "lstack_thread_rpc.h"
#include "lstack_lwip.h"
@@ -258,7 +257,8 @@ struct rte_mempool *create_mempool(const char *name, uint32_t count, uint32_t si
int32_t create_shared_ring(struct protocol_stack *stack)
{
- lockless_queue_init(&stack->rpc_queue);
+ rpc_queue_init(&stack->rpc_queue);
+ rpc_queue_init(&stack->dfx_rpc_queue);
if (use_ltran()) {
stack->rx_ring = gazelle_ring_create_fast("RING_RX", VDEV_RX_QUEUE_SZ, RING_F_SP_ENQ | RING_F_SC_DEQ);
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 3f76424..b79cdef 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -636,7 +636,7 @@ static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t
{
// 2: call_num >= 2, don't need add new rpc send
if (__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) < 2) {
- while (rpc_call_send(fd, NULL, len, flags) < 0) {
+ while (rpc_call_send(&sock->stack->rpc_queue, fd, NULL, len, flags) < 0) {
usleep(1000); // 1000: wait 1ms to exec again
}
__sync_fetch_and_add(&sock->call_num, 1);
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 8b99e82..f63fcb0 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -465,7 +465,10 @@ int stack_polling(uint32_t wakeup_tick)
uint32_t read_connect_number = cfg->read_connect_number;
struct protocol_stack *stack = get_protocol_stack();
- force_quit = poll_rpc_msg(stack, rpc_number);
+ /* 2: one dfx consumes two rpc */
+ rpc_poll_msg(&stack->dfx_rpc_queue, 2);
+ force_quit = rpc_poll_msg(&stack->rpc_queue, rpc_number);
+
gazelle_eth_dev_poll(stack, use_ltran_flag, nic_read_number);
sys_timer_run();
if (cfg->low_power_mod != 0) {
@@ -715,7 +718,7 @@ OUT2:
void stack_arp(struct rpc_msg *msg)
{
struct rte_mbuf *mbuf = (struct rte_mbuf *)msg->args[MSG_ARG_0].p;
- struct protocol_stack *stack = (struct protocol_stack*)msg->args[MSG_ARG_1].p;
+ struct protocol_stack *stack = get_protocol_stack();
eth_dev_recv(mbuf, stack);
}
@@ -893,7 +896,7 @@ void stack_send(struct rpc_msg *msg)
{
int32_t fd = msg->args[MSG_ARG_0].i;
size_t len = msg->args[MSG_ARG_1].size;
- struct protocol_stack *stack = (struct protocol_stack *)msg->args[MSG_ARG_3].p;
+ struct protocol_stack *stack = get_protocol_stack();
int replenish_again;
struct lwip_sock *sock = get_socket(fd);
@@ -947,7 +950,7 @@ void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack
}
copy_mbuf(mbuf_copy, mbuf);
- ret = rpc_call_arp(stack, mbuf_copy);
+ ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy);
if (ret != 0) {
return;
}
@@ -971,7 +974,7 @@ void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup)
for (int32_t i = 0; i < stack_group->stack_num; i++) {
stack = stack_group->stacks[i];
- rpc_call_clean_epoll(stack, wakeup);
+ rpc_call_clean_epoll(&stack->rpc_queue, wakeup);
}
}
@@ -985,17 +988,11 @@ void stack_clean_epoll(struct rpc_msg *msg)
void stack_mempool_size(struct rpc_msg *msg)
{
- struct protocol_stack *stack = (struct protocol_stack*)msg->args[MSG_ARG_0].p;
+ struct protocol_stack *stack = get_protocol_stack();
msg->result = rte_mempool_avail_count(stack->rxtx_mbuf_pool);
}
-void stack_rpcpool_size(struct rpc_msg *msg)
-{
- struct rpc_msg_pool *rpc_mem_pool = (struct rpc_msg_pool*)msg->args[MSG_ARG_0].p;
- msg->result = rte_mempool_avail_count(rpc_mem_pool->mempool);
-}
-
void stack_create_shadow_fd(struct rpc_msg *msg)
{
int32_t fd = msg->args[MSG_ARG_0].i;
@@ -1049,8 +1046,8 @@ void stack_create_shadow_fd(struct rpc_msg *msg)
void stack_replenish_sendring(struct rpc_msg *msg)
{
- struct protocol_stack *stack = (struct protocol_stack *)msg->args[MSG_ARG_0].p;
- struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_1].p;
+ struct protocol_stack *stack = get_protocol_stack();
+ struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p;
msg->result = do_lwip_replenish_sendring(stack, sock);
}
@@ -1070,7 +1067,7 @@ void stack_get_connnum(struct rpc_msg *msg)
void stack_recvlist_count(struct rpc_msg *msg)
{
- struct protocol_stack *stack = (struct protocol_stack *)msg->args[MSG_ARG_0].p;
+ struct protocol_stack *stack = get_protocol_stack();
struct list_node *list = &stack->recv_list;
uint32_t count = 0;
struct list_node *node;
@@ -1086,16 +1083,16 @@ void stack_recvlist_count(struct rpc_msg *msg)
/* when fd is listenfd, listenfd of all protocol stack thread will be closed */
int32_t stack_broadcast_close(int32_t fd)
{
- struct lwip_sock *sock = get_socket(fd);
int32_t ret = 0;
-
+ struct lwip_sock *sock = get_socket(fd);
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
if (sock == NULL) {
- return -1;
+ GAZELLE_RETURN(EBADF);
}
do {
sock = sock->listen_next;
- if (rpc_call_close(fd)) {
+ if (stack == NULL || rpc_call_close(&stack->rpc_queue, fd)) {
ret = -1;
}
@@ -1103,7 +1100,8 @@ int32_t stack_broadcast_close(int32_t fd)
break;
}
fd = sock->conn->callback_arg.socket;
- } while (sock);
+ stack = get_protocol_stack_by_fd(fd);
+ } while (1);
return ret;
}
@@ -1112,13 +1110,14 @@ int stack_broadcast_shutdown(int fd, int how)
{
int32_t ret = 0;
struct lwip_sock *sock = get_socket(fd);
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
if (sock == NULL) {
- return -1;
+ GAZELLE_RETURN(EBADF);
}
do {
sock = sock->listen_next;
- if (rpc_call_shutdown(fd, how)) {
+ if (stack == NULL || rpc_call_shutdown(&stack->rpc_queue, fd, how)) {
ret = -1;
}
@@ -1126,7 +1125,8 @@ int stack_broadcast_shutdown(int fd, int how)
break;
}
fd = sock->conn->callback_arg.socket;
- } while (sock);
+ stack = get_protocol_stack_by_fd(fd);
+ } while (1);
return ret;
}
@@ -1134,7 +1134,11 @@ int stack_broadcast_shutdown(int fd, int how)
/* choice one stack listen */
int32_t stack_single_listen(int32_t fd, int32_t backlog)
{
- return rpc_call_listen(fd, backlog);
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
+ if (stack == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+ return rpc_call_listen(&stack->rpc_queue, fd, backlog);
}
/* listen sync to all protocol stack thread, so that any protocol stack thread can build connect */
@@ -1153,12 +1157,12 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog)
int32_t ret, clone_fd;
struct lwip_sock *sock = get_socket(fd);
- if (sock == NULL) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null\n", get_stack_tid(), fd);
- GAZELLE_RETURN(EINVAL);
+ if (sock == NULL || cur_stack == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd);
+ GAZELLE_RETURN(EBADF);
}
- ret = rpc_call_getsockname(fd, (struct sockaddr *)&addr, &addr_len);
+ ret = rpc_call_getsockname(&cur_stack->rpc_queue, fd, (struct sockaddr *)&addr, &addr_len);
if (ret != 0) {
return ret;
}
@@ -1172,7 +1176,7 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog)
continue;
}
if (stack != cur_stack) {
- clone_fd = rpc_call_shadow_fd(stack, fd, (struct sockaddr *)&addr, addr_len);
+ clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, (struct sockaddr *)&addr, addr_len);
if (clone_fd < 0) {
stack_broadcast_close(fd);
return clone_fd;
@@ -1187,7 +1191,7 @@ int32_t stack_broadcast_listen(int32_t fd, int32_t backlog)
get_socket_by_fd(clone_fd)->conn->is_master_fd = 0;
}
- ret = rpc_call_listen(clone_fd, backlog);
+ ret = rpc_call_listen(&stack->rpc_queue, clone_fd, backlog);
if (ret < 0) {
stack_broadcast_close(fd);
return ret;
@@ -1234,7 +1238,11 @@ static void inline del_accept_in_event(struct lwip_sock *sock)
/* choice one stack bind */
int32_t stack_single_bind(int32_t fd, const struct sockaddr *name, socklen_t namelen)
{
- return rpc_call_bind(fd, name, namelen);
+ struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
+ if (stack == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+ return rpc_call_bind(&stack->rpc_queue, fd, name, namelen);
}
/* bind sync to all protocol stack thread, so that any protocol stack thread can build connect */
@@ -1245,12 +1253,12 @@ int32_t stack_broadcast_bind(int32_t fd, const struct sockaddr *name, socklen_t
int32_t ret, clone_fd;
struct lwip_sock *sock = get_socket(fd);
- if (sock == NULL) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null\n", get_stack_tid(), fd);
- GAZELLE_RETURN(EINVAL);
+ if (sock == NULL || cur_stack == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, %d get sock null or stack null\n", get_stack_tid(), fd);
+ GAZELLE_RETURN(EBADF);
}
- ret = rpc_call_bind(fd, name, namelen);
+ ret = rpc_call_bind(&cur_stack->rpc_queue, fd, name, namelen);
if (ret < 0) {
close(fd);
return ret;
@@ -1260,7 +1268,7 @@ int32_t stack_broadcast_bind(int32_t fd, const struct sockaddr *name, socklen_t
for (int32_t i = 0; i < stack_group->stack_num; ++i) {
stack = stack_group->stacks[i];
if (stack != cur_stack) {
- clone_fd = rpc_call_shadow_fd(stack, fd, name, namelen);
+ clone_fd = rpc_call_shadow_fd(&stack->rpc_queue, fd, name, namelen);
if (clone_fd < 0) {
stack_broadcast_close(fd);
return clone_fd;
@@ -1276,9 +1284,9 @@ int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *ad
int32_t ret = -1;
struct lwip_sock *min_sock = NULL;
struct lwip_sock *sock = get_socket(fd);
+ struct protocol_stack *stack = NULL;
if (sock == NULL) {
- errno = EINVAL;
- return -1;
+ GAZELLE_RETURN(EBADF);
}
if (netconn_is_nonblocking(sock->conn)) {
@@ -1290,7 +1298,11 @@ int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *ad
}
if (min_sock && min_sock->conn) {
- ret = rpc_call_accept(min_sock->conn->callback_arg.socket, addr, addrlen, flags);
+ stack = get_protocol_stack_by_fd(min_sock->conn->callback_arg.socket);
+ if (stack == NULL) {
+ GAZELLE_RETURN(EBADF);
+ }
+ ret = rpc_call_accept(&stack->rpc_queue, min_sock->conn->callback_arg.socket, addr, addrlen, flags);
}
if (min_sock && min_sock->wakeup && min_sock->wakeup->type == WAKEUP_EPOLL) {
@@ -1344,7 +1356,7 @@ void stack_group_exit(void)
}
if (stack != stack_group->stacks[i]) {
- rpc_call_stack_exit(stack_group->stacks[i]);
+ rpc_call_stack_exit(&stack_group->stacks[i]->rpc_queue);
}
}
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index 23571b4..01ac6fb 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -175,20 +175,17 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_
get_wakeup_stat(stack_group, stack, &dfx->data.pkts.wakeup_stat);
- dfx->data.pkts.call_alloc_fail = stack_group->call_alloc_fail;
+ dfx->data.pkts.call_alloc_fail = rpc_stats_get()->call_alloc_fail;
- int32_t rpc_call_result = rpc_call_msgcnt(stack);
+ int32_t rpc_call_result = rpc_msgcnt(&stack->rpc_queue);
dfx->data.pkts.call_msg_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result;
- rpc_call_result = rpc_call_mbufpoolsize(stack);
+ rpc_call_result = rpc_call_mbufpoolsize(&stack->dfx_rpc_queue);
dfx->data.pkts.mbufpool_avail_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result;
- rpc_call_result = rpc_call_recvlistcnt(stack);
+ rpc_call_result = rpc_call_recvlistcnt(&stack->dfx_rpc_queue);
dfx->data.pkts.recv_list_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result;
- rpc_call_result = rpc_call_rpcpool_size(stack);
- dfx->data.pkts.rpcpool_avail_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result;
-
dfx->data.pkts.conn_num = stack->conn_num;
}
@@ -219,9 +216,10 @@ static void get_stack_dfx_data(struct gazelle_stack_dfx_data *dfx, struct protoc
}
break;
case GAZELLE_STAT_LSTACK_SHOW_CONN:
- rpc_call_result = rpc_call_conntable(stack, dfx->data.conn.conn_list, GAZELLE_LSTACK_MAX_CONN);
+ rpc_call_result = rpc_call_conntable(&stack->dfx_rpc_queue, dfx->data.conn.conn_list,
+ GAZELLE_LSTACK_MAX_CONN);
dfx->data.conn.conn_num = (rpc_call_result < 0) ? 0 : rpc_call_result;
- rpc_call_result = rpc_call_connnum(stack);
+ rpc_call_result = rpc_call_connnum(&stack->dfx_rpc_queue);
dfx->data.conn.total_conn_num = (rpc_call_result < 0) ? 0 : rpc_call_result;
break;
case GAZELLE_STAT_LSTACK_SHOW_LATENCY:
@@ -296,7 +294,7 @@ int handle_stack_cmd(int fd, enum GAZELLE_STAT_MODE stat_mode)
}
dfx.tid = stack->tid;
- dfx.stack_id = i;
+ dfx.stack_id = i;
if (i == stack_group->stack_num - 1) {
dfx.eof = 1;
}
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index 2af30d7..1fdb037 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -9,21 +9,20 @@
* PURPOSE.
* See the Mulan PSL v2 for more details.
*/
-#include <sys/types.h>
-#include <stdatomic.h>
#include <lwip/sockets.h>
-#include <lwipsock.h>
#include <rte_mempool.h>
#include "lstack_log.h"
-#include "lstack_lwip.h"
-#include "lstack_protocol_stack.h"
-#include "lstack_control_plane.h"
-#include "gazelle_base_func.h"
#include "lstack_dpdk.h"
+#include "lstack_rpc_proc.h"
#include "lstack_thread_rpc.h"
static PER_THREAD struct rpc_msg_pool *g_rpc_pool = NULL;
+static struct rpc_stats g_rpc_stats;
+struct rpc_stats *rpc_stats_get(void)
+{
+ return &g_rpc_stats;
+}
static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool)
{
@@ -37,33 +36,29 @@ static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct
return msg;
}
-static struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func func)
+static struct rpc_msg *rpc_msg_alloc(rpc_msg_func func)
{
struct rpc_msg *msg = NULL;
- if (stack == NULL) {
- return NULL;
- }
-
if (g_rpc_pool == NULL) {
g_rpc_pool = calloc(1, sizeof(struct rpc_msg_pool));
if (g_rpc_pool == NULL) {
LSTACK_LOG(INFO, LSTACK, "g_rpc_pool calloc failed\n");
- get_protocol_stack_group()->call_alloc_fail++;
+ g_rpc_stats.call_alloc_fail++;
return NULL;
}
g_rpc_pool->mempool = create_mempool("rpc_pool", RPC_MSG_MAX, sizeof(struct rpc_msg),
0, rte_gettid());
if (g_rpc_pool->mempool == NULL) {
- get_protocol_stack_group()->call_alloc_fail++;
+ g_rpc_stats.call_alloc_fail++;
return NULL;
}
}
msg = get_rpc_msg(g_rpc_pool);
if (msg == NULL) {
- get_protocol_stack_group()->call_alloc_fail++;
+ g_rpc_stats.call_alloc_fail++;
return NULL;
}
msg->rpcpool = g_rpc_pool;
@@ -75,7 +70,7 @@ static struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func
return msg;
}
-static inline __attribute__((always_inline)) int32_t rpc_sync_call(lockless_queue *queue, struct rpc_msg *msg)
+static inline __attribute__((always_inline)) int32_t rpc_sync_call(rpc_queue *queue, struct rpc_msg *msg)
{
int32_t ret;
@@ -90,13 +85,18 @@ static inline __attribute__((always_inline)) int32_t rpc_sync_call(lockless_queu
return ret;
}
-int poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num)
+int32_t rpc_msgcnt(rpc_queue *queue)
+{
+ return lockless_queue_count(queue);
+}
+
+int rpc_poll_msg(rpc_queue *queue, uint32_t max_num)
{
int force_quit = 0;
struct rpc_msg *msg = NULL;
while (max_num--) {
- lockless_queue_node *node = lockless_queue_mpsc_pop(&stack->rpc_queue);
+ lockless_queue_node *node = lockless_queue_mpsc_pop(queue);
if (node == NULL) {
break;
}
@@ -106,7 +106,7 @@ int poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num)
if (msg->func) {
msg->func(msg);
} else {
- stack->stats.call_null++;
+ g_rpc_stats.call_null++;
}
if (msg->func == stack_exit_by_rpc) {
@@ -127,9 +127,9 @@ int poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num)
return force_quit;
}
-int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn)
+int32_t rpc_call_conntable(rpc_queue *queue, void *conn_table, uint32_t max_conn)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_get_conntable);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_get_conntable);
if (msg == NULL) {
return -1;
}
@@ -137,22 +137,22 @@ int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint3
msg->args[MSG_ARG_0].p = conn_table;
msg->args[MSG_ARG_1].u = max_conn;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_connnum(struct protocol_stack *stack)
+int32_t rpc_call_connnum(rpc_queue *queue)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_get_connnum);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_get_connnum);
if (msg == NULL) {
return -1;
}
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen)
+int32_t rpc_call_shadow_fd(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_create_shadow_fd);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_create_shadow_fd);
if (msg == NULL) {
return -1;
}
@@ -161,100 +161,67 @@ int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struc
msg->args[MSG_ARG_1].cp = addr;
msg->args[MSG_ARG_2].socklen = addrlen;
- return rpc_sync_call(&stack->rpc_queue, msg);
-}
-
-static void rpc_msgcnt(struct rpc_msg *msg)
-{
- struct protocol_stack *stack = get_protocol_stack();
- msg->result = lockless_queue_count(&stack->rpc_queue);
-}
-
-int32_t rpc_call_msgcnt(struct protocol_stack *stack)
-{
- struct rpc_msg *msg = rpc_msg_alloc(stack, rpc_msgcnt);
- if (msg == NULL) {
- return -1;
- }
-
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn)
+int32_t rpc_call_thread_regphase1(rpc_queue *queue, void *conn)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack, thread_register_phase1);
+ struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase1);
if (msg == NULL) {
return -1;
}
msg->args[MSG_ARG_0].p = conn;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn)
+int32_t rpc_call_thread_regphase2(rpc_queue *queue, void *conn)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack, thread_register_phase2);
+ struct rpc_msg *msg = rpc_msg_alloc(thread_register_phase2);
if (msg == NULL) {
return -1;
}
msg->args[MSG_ARG_0].p = conn;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_mbufpoolsize(struct protocol_stack *stack)
+int32_t rpc_call_mbufpoolsize(rpc_queue *queue)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_mempool_size);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_mempool_size);
if (msg == NULL) {
return -1;
}
- msg->args[MSG_ARG_0].p = stack;
-
- return rpc_sync_call(&stack->rpc_queue, msg);
-}
-
-int32_t rpc_call_rpcpool_size(struct protocol_stack *stack)
-{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_rpcpool_size);
- if (msg == NULL) {
- return -1;
- }
- msg->args[MSG_ARG_0].p = g_rpc_pool;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_recvlistcnt(struct protocol_stack *stack)
+int32_t rpc_call_recvlistcnt(rpc_queue *queue)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvlist_count);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_recvlist_count);
if (msg == NULL) {
return -1;
}
- msg->args[MSG_ARG_0].p = stack;
-
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf)
+int32_t rpc_call_arp(rpc_queue *queue, void *mbuf)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_arp);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_arp);
if (msg == NULL) {
return -1;
}
msg->sync_flag = 0;
msg->args[MSG_ARG_0].p = mbuf;
- msg->args[MSG_ARG_1].p = stack;
- rpc_call(&stack->rpc_queue, msg);
+ rpc_call(queue, msg);
return 0;
}
-int32_t rpc_call_socket(int32_t domain, int32_t type, int32_t protocol)
+int32_t rpc_call_socket(rpc_queue *queue, int32_t domain, int32_t type, int32_t protocol)
{
- struct protocol_stack *stack = get_bind_protocol_stack();
-
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_socket);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_socket);
if (msg == NULL) {
return -1;
}
@@ -263,39 +230,35 @@ int32_t rpc_call_socket(int32_t domain, int32_t type, int32_t protocol)
msg->args[MSG_ARG_1].i = type;
msg->args[MSG_ARG_2].i = protocol;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_close(int fd)
+int32_t rpc_call_close(rpc_queue *queue, int fd)
{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_close);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_close);
if (msg == NULL) {
return -1;
}
msg->args[MSG_ARG_0].i = fd;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_stack_exit(struct protocol_stack *stack)
+int32_t rpc_call_stack_exit(rpc_queue *queue)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_exit_by_rpc);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_exit_by_rpc);
if (msg == NULL) {
- LSTACK_LOG(INFO, LSTACK, "rpc msg alloc failed\n");
return -1;
}
- rpc_call(&stack->rpc_queue, msg);
+ rpc_call(queue, msg);
return 0;
}
-int32_t rpc_call_shutdown(int fd, int how)
+int32_t rpc_call_shutdown(rpc_queue *queue, int fd, int how)
{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
-
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_shutdown);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_shutdown);
if (msg == NULL) {
return -1;
}
@@ -303,25 +266,24 @@ int32_t rpc_call_shutdown(int fd, int how)
msg->args[MSG_ARG_0].i = fd;
msg->args[MSG_ARG_1].i = how;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-void rpc_call_clean_epoll(struct protocol_stack *stack, struct wakeup_poll *wakeup)
+void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_clean_epoll);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_clean_epoll);
if (msg == NULL) {
return;
}
msg->args[MSG_ARG_0].p = wakeup;
- rpc_sync_call(&stack->rpc_queue, msg);
+ rpc_sync_call(queue, msg);
}
-int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen)
+int32_t rpc_call_bind(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen)
{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_bind);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_bind);
if (msg == NULL) {
return -1;
}
@@ -330,13 +292,12 @@ int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen
msg->args[MSG_ARG_1].cp = addr;
msg->args[MSG_ARG_2].socklen = addrlen;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_listen(int s, int backlog)
+int32_t rpc_call_listen(rpc_queue *queue, int s, int backlog)
{
- struct protocol_stack *stack = get_protocol_stack_by_fd(s);
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_listen);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_listen);
if (msg == NULL) {
return -1;
}
@@ -344,13 +305,12 @@ int32_t rpc_call_listen(int s, int backlog)
msg->args[MSG_ARG_0].i = s;
msg->args[MSG_ARG_1].i = backlog;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
+int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_accept);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_accept);
if (msg == NULL) {
return -1;
}
@@ -360,13 +320,12 @@ int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int f
msg->args[MSG_ARG_2].p = addrlen;
msg->args[MSG_ARG_3].i = flags;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen)
+int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen)
{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_connect);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_connect);
if (msg == NULL) {
return -1;
}
@@ -375,7 +334,7 @@ int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen)
msg->args[MSG_ARG_1].cp = addr;
msg->args[MSG_ARG_2].socklen = addrlen;
- int32_t ret = rpc_sync_call(&stack->rpc_queue, msg);
+ int32_t ret = rpc_sync_call(queue, msg);
if (ret < 0) {
errno = -ret;
return -1;
@@ -383,10 +342,9 @@ int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen)
return ret;
}
-int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen)
+int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen)
{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_getpeername);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_getpeername);
if (msg == NULL) {
return -1;
}
@@ -395,13 +353,12 @@ int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen)
msg->args[MSG_ARG_1].p = addr;
msg->args[MSG_ARG_2].p = addrlen;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen)
+int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen)
{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_getsockname);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_getsockname);
if (msg == NULL) {
return -1;
}
@@ -410,13 +367,12 @@ int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen)
msg->args[MSG_ARG_1].p = addr;
msg->args[MSG_ARG_2].p = addrlen;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen)
+int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen)
{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_getsockopt);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_getsockopt);
if (msg == NULL) {
return -1;
}
@@ -427,13 +383,12 @@ int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, sockle
msg->args[MSG_ARG_3].p = optval;
msg->args[MSG_ARG_4].p = optlen;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen)
+int32_t rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen)
{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_setsockopt);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_setsockopt);
if (msg == NULL) {
return -1;
}
@@ -444,13 +399,12 @@ int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval,
msg->args[MSG_ARG_3].cp = optval;
msg->args[MSG_ARG_4].socklen = optlen;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_fcntl(int fd, int cmd, long val)
+int32_t rpc_call_fcntl(rpc_queue *queue, int fd, int cmd, long val)
{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_fcntl);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_fcntl);
if (msg == NULL) {
return -1;
}
@@ -459,13 +413,12 @@ int32_t rpc_call_fcntl(int fd, int cmd, long val)
msg->args[MSG_ARG_1].i = cmd;
msg->args[MSG_ARG_2].l = val;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_ioctl(int fd, long cmd, void *argp)
+int32_t rpc_call_ioctl(rpc_queue *queue, int fd, long cmd, void *argp)
{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_ioctl);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_ioctl);
if (msg == NULL) {
return -1;
}
@@ -474,27 +427,24 @@ int32_t rpc_call_ioctl(int fd, long cmd, void *argp)
msg->args[MSG_ARG_1].l = cmd;
msg->args[MSG_ARG_2].p = argp;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_replenish(struct protocol_stack *stack, struct lwip_sock *sock)
+int32_t rpc_call_replenish(rpc_queue *queue, void *sock)
{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_replenish_sendring);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_replenish_sendring);
if (msg == NULL) {
return -1;
}
- msg->args[MSG_ARG_0].p = stack;
- msg->args[MSG_ARG_1].p = sock;
+ msg->args[MSG_ARG_0].p = sock;
- return rpc_sync_call(&stack->rpc_queue, msg);
+ return rpc_sync_call(queue, msg);
}
-int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags)
+int32_t rpc_call_send(rpc_queue *queue, int fd, const void *buf, size_t len, int flags)
{
- struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
-
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_send);
+ struct rpc_msg *msg = rpc_msg_alloc(stack_send);
if (msg == NULL) {
return -1;
}
@@ -502,10 +452,9 @@ int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags)
msg->args[MSG_ARG_0].i = fd;
msg->args[MSG_ARG_1].size = len;
msg->args[MSG_ARG_2].i = flags;
- msg->args[MSG_ARG_3].p = stack;
msg->sync_flag = 0;
- rpc_call(&stack->rpc_queue, msg);
+ rpc_call(queue, msg);
return 0;
}
diff --git a/src/lstack/include/lstack_control_plane.h b/src/lstack/include/lstack_control_plane.h
index aed5443..548d725 100644
--- a/src/lstack/include/lstack_control_plane.h
+++ b/src/lstack/include/lstack_control_plane.h
@@ -23,14 +23,11 @@ enum vdev_request {
VDEV_NONE,
};
-struct rpc_msg;
int client_reg_thrd_ring(void);
int32_t control_init_client(bool is_reconnect);
void control_client_thread(void *arg);
void control_server_thread(void *arg);
bool get_register_state(void);
-void thread_register_phase1(struct rpc_msg *msg);
-void thread_register_phase2(struct rpc_msg *msg);
void control_fd_close(void);
void delete_primary_path(void);
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index c681547..7489f2a 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -21,7 +21,7 @@
#include <lwip/netif.h>
#include "gazelle_dfx_msg.h"
-#include "lstack_lockless_queue.h"
+#include "lstack_thread_rpc.h"
#include "lstack_ethdev.h"
#include "gazelle_opt.h"
@@ -59,13 +59,15 @@ struct protocol_stack {
volatile bool low_power;
bool is_send_thread;
- lockless_queue rpc_queue __rte_cache_aligned;
- char pad __rte_cache_aligned;
+ char pad1 __rte_cache_aligned;
+ rpc_queue dfx_rpc_queue;
+ rpc_queue rpc_queue;
+ char pad2 __rte_cache_aligned;
/* kernel event thread read/write frequently */
struct epoll_event kernel_events[KERNEL_EPOLL_MAX];
int32_t kernel_event_num;
- char pad1 __rte_cache_aligned;
+ char pad3 __rte_cache_aligned;
struct netif netif;
struct lstack_dev_ops dev_ops;
@@ -149,36 +151,10 @@ void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup);
void stack_send_pkts(struct protocol_stack *stack);
-struct rpc_msg;
struct thread_params {
uint16_t queue_id;
uint16_t idx;
};
-void stack_clean_epoll(struct rpc_msg *msg);
-void stack_arp(struct rpc_msg *msg);
-void stack_socket(struct rpc_msg *msg);
-void stack_close(struct rpc_msg *msg);
-void stack_shutdown(struct rpc_msg *msg);
-void stack_bind(struct rpc_msg *msg);
-void stack_listen(struct rpc_msg *msg);
-void stack_accept(struct rpc_msg *msg);
-void stack_connect(struct rpc_msg *msg);
-void stack_recv(struct rpc_msg *msg);
-void stack_getpeername(struct rpc_msg *msg);
-void stack_getsockname(struct rpc_msg *msg);
-void stack_getsockopt(struct rpc_msg *msg);
-void stack_setsockopt(struct rpc_msg *msg);
-void stack_fcntl(struct rpc_msg *msg);
-void stack_ioctl(struct rpc_msg *msg);
-void stack_send(struct rpc_msg *msg);
-void stack_mempool_size(struct rpc_msg *msg);
-void stack_rpcpool_size(struct rpc_msg *msg);
-void stack_create_shadow_fd(struct rpc_msg *msg);
-void stack_replenish_sendring(struct rpc_msg *msg);
-void stack_get_conntable(struct rpc_msg *msg);
-void stack_get_connnum(struct rpc_msg *msg);
-void stack_recvlist_count(struct rpc_msg *msg);
-void stack_exit_by_rpc(struct rpc_msg *msg);
int stack_polling(uint32_t wakeup_tick);
#endif
diff --git a/src/lstack/include/lstack_rpc_proc.h b/src/lstack/include/lstack_rpc_proc.h
new file mode 100644
index 0000000..71f0c58
--- /dev/null
+++ b/src/lstack/include/lstack_rpc_proc.h
@@ -0,0 +1,46 @@
+/*
+* Copyright (c) Huawei Technologies Co., Ltd. 2020-2021. All rights reserved.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+#ifndef __GAZELLE_RPC_PROC_H__
+#define __GAZELLE_RPC_PROC_H__
+#include "lstack_thread_rpc.h"
+
+void stack_clean_epoll(struct rpc_msg *msg);
+void stack_arp(struct rpc_msg *msg);
+void stack_socket(struct rpc_msg *msg);
+void stack_close(struct rpc_msg *msg);
+void stack_shutdown(struct rpc_msg *msg);
+void stack_bind(struct rpc_msg *msg);
+void stack_listen(struct rpc_msg *msg);
+void stack_accept(struct rpc_msg *msg);
+void stack_connect(struct rpc_msg *msg);
+void stack_recv(struct rpc_msg *msg);
+void stack_getpeername(struct rpc_msg *msg);
+void stack_getsockname(struct rpc_msg *msg);
+void stack_getsockopt(struct rpc_msg *msg);
+void stack_setsockopt(struct rpc_msg *msg);
+void stack_fcntl(struct rpc_msg *msg);
+void stack_ioctl(struct rpc_msg *msg);
+void stack_send(struct rpc_msg *msg);
+void stack_mempool_size(struct rpc_msg *msg);
+void stack_rpcpool_size(struct rpc_msg *msg);
+void stack_create_shadow_fd(struct rpc_msg *msg);
+void stack_replenish_sendring(struct rpc_msg *msg);
+void stack_get_conntable(struct rpc_msg *msg);
+void stack_get_connnum(struct rpc_msg *msg);
+void stack_recvlist_count(struct rpc_msg *msg);
+void stack_exit_by_rpc(struct rpc_msg *msg);
+
+void thread_register_phase1(struct rpc_msg *msg);
+void thread_register_phase2(struct rpc_msg *msg);
+
+#endif
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index 633ef93..30caa66 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -28,6 +28,12 @@
#define RPC_MSG_MAX 4096
#define RPC_MSG_MASK (RPC_MSG_MAX - 1)
+typedef struct lockless_queue rpc_queue;
+
+struct rpc_stats {
+ uint16_t call_null;
+ uint64_t call_alloc_fail;
+};
struct rpc_msg;
typedef void (*rpc_msg_func)(struct rpc_msg *msg);
@@ -41,7 +47,9 @@ union rpc_msg_arg {
socklen_t socklen;
size_t size;
};
-struct rpc_msg_pool;
+struct rpc_msg_pool {
+ struct rte_mempool *mempool;
+};
struct rpc_msg {
pthread_spinlock_t lock; /* msg handler unlock notice sender msg process done */
int8_t sync_flag : 1;
@@ -54,44 +62,41 @@ struct rpc_msg {
union rpc_msg_arg args[RPM_MSG_ARG_SIZE]; /* resolve by type */
};
-struct rpc_msg_pool {
- struct rte_mempool *mempool;
-};
+static inline void rpc_queue_init(rpc_queue *queue)
+{
+ lockless_queue_init(queue);
+}
-struct protocol_stack;
-struct rte_mbuf;
-struct wakeup_poll;
-struct lwip_sock;
-int poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num);
-void rpc_call_clean_epoll(struct protocol_stack *stack, struct wakeup_poll *wakeup);
-int32_t rpc_call_msgcnt(struct protocol_stack *stack);
-int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
-int32_t rpc_call_recvlistcnt(struct protocol_stack *stack);
-int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn);
-int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn);
-int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn);
-int32_t rpc_call_connnum(struct protocol_stack *stack);
-int32_t rpc_call_arp(struct protocol_stack *stack, struct rte_mbuf *mbuf);
-int32_t rpc_call_socket(int32_t domain, int32_t type, int32_t protocol);
-int32_t rpc_call_close(int32_t fd);
-int32_t rpc_call_shutdown(int fd, int how);
-int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
-int32_t rpc_call_listen(int s, int backlog);
-int32_t rpc_call_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);
-int32_t rpc_call_connect(int fd, const struct sockaddr *addr, socklen_t addrlen);
-int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags);
-int32_t rpc_call_getpeername(int fd, struct sockaddr *addr, socklen_t *addrlen);
-int32_t rpc_call_getsockname(int fd, struct sockaddr *addr, socklen_t *addrlen);
-int32_t rpc_call_getsockopt(int fd, int level, int optname, void *optval, socklen_t *optlen);
-int32_t rpc_call_setsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen);
-int32_t rpc_call_fcntl(int fd, int cmd, long val);
-int32_t rpc_call_ioctl(int fd, long cmd, void *argp);
-int32_t rpc_call_replenish(struct protocol_stack *stack, struct lwip_sock *sock);
-int32_t rpc_call_mbufpoolsize(struct protocol_stack *stack);
-int32_t rpc_call_rpcpool_size(struct protocol_stack *stack);
-int32_t rpc_call_stack_exit(struct protocol_stack *stack);
+struct rpc_stats *rpc_stats_get(void);
+int32_t rpc_msgcnt(rpc_queue *queue);
+int rpc_poll_msg(rpc_queue *queue, uint32_t max_num);
+void rpc_call_clean_epoll(rpc_queue *queue, void *wakeup);
+int32_t rpc_call_shadow_fd(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
+int32_t rpc_call_recvlistcnt(rpc_queue *queue);
+int32_t rpc_call_thread_regphase1(rpc_queue *queue, void *conn);
+int32_t rpc_call_thread_regphase2(rpc_queue *queue, void *conn);
+int32_t rpc_call_conntable(rpc_queue *queue, void *conn_table, uint32_t max_conn);
+int32_t rpc_call_connnum(rpc_queue *queue);
+int32_t rpc_call_arp(rpc_queue *queue, void *mbuf);
+int32_t rpc_call_socket(rpc_queue *queue, int32_t domain, int32_t type, int32_t protocol);
+int32_t rpc_call_close(rpc_queue *queue, int32_t fd);
+int32_t rpc_call_shutdown(rpc_queue *queue, int fd, int how);
+int32_t rpc_call_bind(rpc_queue *queue, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
+int32_t rpc_call_listen(rpc_queue *queue, int s, int backlog);
+int32_t rpc_call_accept(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);
+int32_t rpc_call_connect(rpc_queue *queue, int fd, const struct sockaddr *addr, socklen_t addrlen);
+int32_t rpc_call_send(rpc_queue *queue, int fd, const void *buf, size_t len, int flags);
+int32_t rpc_call_getpeername(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen);
+int32_t rpc_call_getsockname(rpc_queue *queue, int fd, struct sockaddr *addr, socklen_t *addrlen);
+int32_t rpc_call_getsockopt(rpc_queue *queue, int fd, int level, int optname, void *optval, socklen_t *optlen);
+int32_t rpc_call_setsockopt(rpc_queue *queue, int fd, int level, int optname, const void *optval, socklen_t optlen);
+int32_t rpc_call_fcntl(rpc_queue *queue, int fd, int cmd, long val);
+int32_t rpc_call_ioctl(rpc_queue *queue, int fd, long cmd, void *argp);
+int32_t rpc_call_replenish(rpc_queue *queue, void *sock);
+int32_t rpc_call_mbufpoolsize(rpc_queue *queue);
+int32_t rpc_call_stack_exit(rpc_queue *queue);
-static inline __attribute__((always_inline)) void rpc_call(lockless_queue *queue, struct rpc_msg *msg)
+static inline __attribute__((always_inline)) void rpc_call(rpc_queue *queue, struct rpc_msg *msg)
{
lockless_queue_mpsc_push(queue, &msg->queue_node);
}
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 4d6f620..965de58 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -529,7 +529,7 @@ void transfer_tcp_to_thread(struct rte_mbuf *mbuf, uint16_t stk_idx)
struct protocol_stack *stack = get_protocol_stack_group()->stacks[stk_idx];
int ret = -1;
while(ret != 0) {
- ret = rpc_call_arp(stack, mbuf);
+ ret = rpc_call_arp(&stack->rpc_queue, mbuf);
printf("transfer_tcp_to_thread, ret : %d \n", ret);
}
}
@@ -550,10 +550,10 @@ void parse_arp_and_transefer(char* buf)
}
copy_mbuf(mbuf_copy, mbuf);
- ret = rpc_call_arp(stack, mbuf_copy);
+ ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy);
while (ret != 0) {
- rpc_call_arp(stack, mbuf_copy);;
+ rpc_call_arp(&stack->rpc_queue, mbuf_copy);
}
}
}
diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c
index 9f12096..bea0dc7 100644
--- a/src/ltran/ltran_dfx.c
+++ b/src/ltran/ltran_dfx.c
@@ -646,8 +646,7 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat)
printf("call_alloc_fail: %-12"PRIu64" ", lstack_stat->data.pkts.call_alloc_fail);
printf("call_null: %-18"PRIu64" \n", lstack_stat->data.pkts.stack_stat.call_null);
printf("send_pkts_fail: %-13"PRIu64" ", lstack_stat->data.pkts.stack_stat.send_pkts_fail);
- printf("mbuf_pool_freecnt: %-10"PRIu32" ", lstack_stat->data.pkts.mbufpool_avail_cnt);
- printf("rpc_pool_freecnt: %-12"PRIu32" \n", lstack_stat->data.pkts.rpcpool_avail_cnt);
+ printf("mbuf_pool_freecnt: %-10"PRIu32" \n", lstack_stat->data.pkts.mbufpool_avail_cnt);
printf("accpet_fail: %-16"PRIu64" ", lstack_stat->data.pkts.stack_stat.accept_fail);
printf("sock_rx_drop: %-15"PRIu64" ", lstack_stat->data.pkts.stack_stat.sock_rx_drop);
printf("sock_tx_merge: %-16"PRIu64" \n", lstack_stat->data.pkts.stack_stat.sock_tx_merge);
--
2.27.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。