1 Star 0 Fork 32

吴昌盛/gazelle-tar

forked from src-openEuler/gazelle 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
0203-add-pbuf-lock-when-aggregate-pbuf.patch 20.62 KB
一键复制 编辑 原始数据 按行查看 历史
jinag12 提交于 2023-03-18 15:13 . syn add pbuf lock when aggregate pbuf
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563
From c095a3145da08e976bb6e1e2a2abc6bc3d3aac31 Mon Sep 17 00:00:00 2001
From: jianheng <jiangheng14@huawei.com>
Date: Sun, 12 Mar 2023 01:30:34 +0800
Subject: [PATCH] add pbuf lock when aggregate pbuf reduce invalid send rpc
count
---
src/common/gazelle_dfx_msg.h | 1 -
src/lstack/core/lstack_cfg.c | 8 --
src/lstack/core/lstack_lwip.c | 150 ++++++++-------------
src/lstack/core/lstack_protocol_stack.c | 4 -
src/lstack/core/lstack_stack_stat.c | 3 -
src/lstack/core/lstack_thread_rpc.c | 46 +------
src/lstack/include/lstack_cfg.h | 1 -
src/lstack/include/lstack_lwip.h | 1 -
src/lstack/include/lstack_protocol_stack.h | 1 -
src/lstack/include/lstack_thread_rpc.h | 25 +++-
src/lstack/lstack.conf | 2 -
src/ltran/ltran_dfx.c | 1 -
12 files changed, 87 insertions(+), 156 deletions(-)
diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h
index 674f2d7..608a023 100644
--- a/src/common/gazelle_dfx_msg.h
+++ b/src/common/gazelle_dfx_msg.h
@@ -81,7 +81,6 @@ struct gazelle_stat_pkts {
uint16_t conn_num;
uint64_t recv_list_cnt;
uint64_t call_alloc_fail;
- uint64_t send_list_cnt;
uint32_t mempool_freecnt;
struct gazelle_stack_stat stack_stat;
struct gazelle_wakeup_stat wakeup_stat;
diff --git a/src/lstack/core/lstack_cfg.c b/src/lstack/core/lstack_cfg.c
index 86d0f14..9195f34 100644
--- a/src/lstack/core/lstack_cfg.c
+++ b/src/lstack/core/lstack_cfg.c
@@ -58,7 +58,6 @@ static int32_t parse_listen_shadow(void);
static int32_t parse_app_bind_numa(void);
static int32_t parse_main_thread_affinity(void);
static int32_t parse_unix_prefix(void);
-static int32_t parse_send_connect_number(void);
static int32_t parse_read_connect_number(void);
static int32_t parse_rpc_number(void);
static int32_t parse_nic_read_number(void);
@@ -108,7 +107,6 @@ static struct config_vector_t g_config_tbl[] = {
{ "unix_prefix", parse_unix_prefix },
{ "tcp_conn_count", parse_tcp_conn_count },
{ "mbuf_count_per_conn", parse_mbuf_count_per_conn },
- { "send_connect_number", parse_send_connect_number },
{ "read_connect_number", parse_read_connect_number },
{ "rpc_number", parse_rpc_number },
{ "nic_read_number", parse_nic_read_number },
@@ -734,12 +732,6 @@ static int32_t parse_mbuf_count_per_conn(void)
MBUF_COUNT_PER_CONN, 1, INT32_MAX);
}
-static int32_t parse_send_connect_number(void)
-{
- return parse_int(&g_config_params.send_connect_number, "send_connect_number",
- STACK_THREAD_DEFAULT, 1, INT32_MAX);
-}
-
static int32_t parse_read_connect_number(void)
{
return parse_int(&g_config_params.read_connect_number, "read_connect_number",
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index dcd7e05..e83bffa 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -94,7 +94,7 @@ static void reset_sock_data(struct lwip_sock *sock)
sock->listen_next = NULL;
sock->epoll_events = 0;
sock->events = 0;
- sock->in_send = 0;
+ sock->call_num = 0;
sock->remain_len = 0;
if (sock->recv_lastdata) {
@@ -117,9 +117,10 @@ static struct pbuf *init_mbuf_to_pbuf(struct rte_mbuf *mbuf, pbuf_layer layer, u
pbuf->l4_len = 0;
pbuf->header_off = 0;
pbuf->rexmit = 0;
- pbuf->in_write = 0;
+ pbuf->allow_in = 1;
pbuf->head = 0;
pbuf->last = pbuf;
+ pthread_spin_init(&pbuf->pbuf_lock, PTHREAD_PROCESS_SHARED);
}
return pbuf;
@@ -193,7 +194,6 @@ void gazelle_init_sock(int32_t fd)
sock->stack->conn_num++;
init_list_node_null(&sock->recv_list);
init_list_node_null(&sock->event_list);
- init_list_node_null(&sock->send_list);
}
void gazelle_clean_sock(int32_t fd)
@@ -214,7 +214,6 @@ void gazelle_clean_sock(int32_t fd)
reset_sock_data(sock);
list_del_node_null(&sock->recv_list);
- list_del_node_null(&sock->send_list);
}
void gazelle_free_pbuf(struct pbuf *pbuf)
@@ -264,11 +263,16 @@ struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8
if (unlikely(sock->send_pre_del)) {
pbuf = sock->send_pre_del;
- if (pbuf->tot_len > remain_size ||
- (pbuf->head && __atomic_load_n(&pbuf->in_write, __ATOMIC_ACQUIRE))) {
+ pthread_spin_lock(&pbuf->pbuf_lock);
+ if (pbuf->tot_len > remain_size) {
+ pthread_spin_unlock(&pbuf->pbuf_lock);
*apiflags &= ~TCP_WRITE_FLAG_MORE;
return NULL;
}
+ if (pbuf->allow_in == 1) {
+ __sync_fetch_and_sub(&pbuf->allow_in, 1);
+ }
+ pthread_spin_unlock(&pbuf->pbuf_lock);
if (pbuf->next) {
sock->send_lastdata = pbuf->next;
@@ -295,10 +299,24 @@ struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8
}
sock->send_pre_del = pbuf;
- if (pbuf->tot_len > remain_size || __atomic_load_n(&pbuf->in_write, __ATOMIC_ACQUIRE)) {
- *apiflags &= ~TCP_WRITE_FLAG_MORE;
- pbuf->head = 1;
- return NULL;
+ if (!gazelle_ring_readover_count(sock->send_ring)) {
+ pthread_spin_lock(&pbuf->pbuf_lock);
+ if (pbuf->tot_len > remain_size) {
+ pthread_spin_unlock(&pbuf->pbuf_lock);
+ *apiflags &= ~TCP_WRITE_FLAG_MORE;
+ pbuf->head = 1;
+ return NULL;
+ }
+ if(pbuf->allow_in == 1){
+ __sync_fetch_and_sub(&pbuf->allow_in, 1);
+ }
+ pthread_spin_unlock(&pbuf->pbuf_lock);
+ } else {
+ if (pbuf->tot_len > remain_size) {
+ *apiflags &= ~TCP_WRITE_FLAG_MORE;
+ pbuf->head = 1;
+ return NULL;
+ }
}
sock->send_lastdata = pbuf->next;
@@ -442,18 +460,17 @@ static inline struct pbuf *gazelle_ring_readlast(struct rte_ring *r)
struct pbuf *last_pbuf = NULL;
volatile uint32_t tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
uint32_t last = r->prod.tail - 1;
- if (last == tail || last - tail > r->capacity) {
+ if (last + 1 == tail || last + 1 - tail > r->capacity) {
return NULL;
}
__rte_ring_dequeue_elems(r, last, (void **)&last_pbuf, sizeof(void *), 1);
- __atomic_store_n(&last_pbuf->in_write, 1, __ATOMIC_RELEASE);
-
- rte_mb();
- tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
- if (last == tail || last - tail > r->capacity) {
- __atomic_store_n(&last_pbuf->in_write, 0, __ATOMIC_RELEASE);
+ if (pthread_spin_trylock(&last_pbuf->pbuf_lock) != 0) {
+ return NULL;
+ }
+ if (last_pbuf->allow_in != 1) {
+ pthread_spin_unlock(&last_pbuf->pbuf_lock);
return NULL;
}
@@ -462,7 +479,7 @@ static inline struct pbuf *gazelle_ring_readlast(struct rte_ring *r)
static inline void gazelle_ring_lastover(struct pbuf *last_pbuf)
{
- __atomic_store_n(&last_pbuf->in_write, 0, __ATOMIC_RELEASE);
+ pthread_spin_unlock(&last_pbuf->pbuf_lock);
}
static inline size_t merge_data_lastpbuf(struct lwip_sock *sock, void *buf, size_t len)
@@ -623,56 +640,28 @@ void stack_send(struct rpc_msg *msg)
{
int32_t fd = msg->args[MSG_ARG_0].i;
struct protocol_stack *stack = (struct protocol_stack *)msg->args[MSG_ARG_3].p;
+ bool replenish_again;
struct lwip_sock *sock = get_socket(fd);
if (sock == NULL) {
msg->result = -1;
+ LSTACK_LOG(ERR, LSTACK, "stack_send: sock error!\n");
+ rpc_msg_free(msg);
return;
}
- __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE);
- rte_mb();
-
- /* have remain data or replenish again add sendlist */
- if (sock->errevent == 0 && NETCONN_IS_DATAOUT(sock)) {
- if (list_is_null(&sock->send_list)) {
- list_add_node(&stack->send_list, &sock->send_list);
- __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE);
- }
- }
-}
-
-void send_stack_list(struct protocol_stack *stack, uint32_t send_max)
-{
- struct list_node *node, *temp;
- struct lwip_sock *sock;
- uint32_t read_num = 0;
- bool replenish_again;
-
- list_for_each_safe(node, temp, &stack->send_list) {
- sock = container_of(node, struct lwip_sock, send_list);
-
- if (++read_num > send_max) {
- /* list head move to next send */
- list_del_node(&stack->send_list);
- list_add_node(&sock->send_list, &stack->send_list);
- break;
- }
-
- __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE);
- rte_mb();
-
- if (sock->conn == NULL || sock->errevent > 0) {
- list_del_node_null(&sock->send_list);
- continue;
- }
-
- replenish_again = do_lwip_send(stack, sock->conn->socket, sock, 0);
-
- if (!NETCONN_IS_DATAOUT(sock) && !replenish_again) {
- list_del_node_null(&sock->send_list);
+ replenish_again = do_lwip_send(stack, sock->conn->socket, sock, 0);
+ __sync_fetch_and_sub(&sock->call_num, 1);
+ if (!NETCONN_IS_DATAOUT(sock) && !replenish_again) {
+ rpc_msg_free(msg);
+ return;
+ } else {
+ if(__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) == 0){
+ rpc_call(&stack->rpc_queue, msg);
+ __sync_fetch_and_add(&sock->call_num, 1);
} else {
- __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE);
+ rpc_msg_free(msg);
+ return;
}
}
}
@@ -686,29 +675,6 @@ static inline void free_recv_ring_readover(struct rte_ring *ring)
}
}
-static inline struct pbuf *gazelle_ring_enqueuelast(struct rte_ring *r)
-{
- struct pbuf *last_pbuf = NULL;
- volatile uint32_t head = __atomic_load_n(&r->prod.head, __ATOMIC_ACQUIRE);
- uint32_t last = r->cons.head - 1;
- if (last == head || last - head > r->capacity) {
- return NULL;
- }
-
- __rte_ring_dequeue_elems(r, last, (void **)&last_pbuf, sizeof(void *), 1);
- __atomic_store_n(&last_pbuf->in_write, 1, __ATOMIC_RELEASE);
-
- rte_mb();
-
- head = __atomic_load_n(&r->prod.head, __ATOMIC_ACQUIRE);
- if (last == head || last - head > r->capacity) {
- __atomic_store_n(&last_pbuf->in_write, 0, __ATOMIC_RELEASE);
- return NULL;
- }
-
- return last_pbuf;
-}
-
static inline struct pbuf *pbuf_last(struct pbuf *pbuf)
{
while (pbuf->next) {
@@ -824,11 +790,14 @@ ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags)
static inline void notice_stack_send(struct lwip_sock *sock, int32_t fd, int32_t len, int32_t flags)
{
- if (__atomic_load_n(&sock->in_send, __ATOMIC_ACQUIRE) == 0) {
- __atomic_store_n(&sock->in_send, 1, __ATOMIC_RELEASE);
- if (rpc_call_send(fd, NULL, len, flags) != 0) {
- __atomic_store_n(&sock->in_send, 0, __ATOMIC_RELEASE);
+ if(__atomic_load_n(&sock->call_num, __ATOMIC_ACQUIRE) >= 2){
+ ;;
+ } else {
+ while (rpc_call_send(fd, NULL, len, flags) < 0) {
+ usleep(1000); // wait 1ms to exec again
+ LSTACK_LOG(INFO, LSTACK, "rpc_call_send failed, try again\n");
}
+ __sync_fetch_and_add(&sock->call_num, 1);
}
}
@@ -1240,13 +1209,6 @@ void stack_mempool_size(struct rpc_msg *msg)
msg->result = rte_mempool_avail_count(stack->rxtx_pktmbuf_pool);
}
-void stack_sendlist_count(struct rpc_msg *msg)
-{
- struct protocol_stack *stack = (struct protocol_stack*)msg->args[MSG_ARG_0].p;
-
- msg->result = get_list_count(&stack->send_list);
-}
-
void stack_recvlist_count(struct rpc_msg *msg)
{
struct protocol_stack *stack = (struct protocol_stack*)msg->args[MSG_ARG_0].p;
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index e13034f..300d7af 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -279,7 +279,6 @@ static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id)
stack->lwip_stats = &lwip_stats;
init_list_node(&stack->recv_list);
- init_list_node(&stack->send_list);
init_list_node(&stack->wakeup_list);
sys_calibrate_tsc();
@@ -417,7 +416,6 @@ static void* gazelle_stack_thread(void *arg)
struct cfg_params *cfg = get_global_cfg_params();
bool use_ltran_flag = cfg->use_ltran;;
bool kni_switch = cfg->kni_switch;
- uint32_t send_connect_number = cfg->send_connect_number;
uint32_t read_connect_number = cfg->read_connect_number;
uint32_t rpc_number = cfg->rpc_number;
uint32_t nic_read_number = cfg->nic_read_number;
@@ -441,8 +439,6 @@ static void* gazelle_stack_thread(void *arg)
for (;;) {
poll_rpc_msg(stack, rpc_number);
- send_stack_list(stack, send_connect_number);
-
gazelle_eth_dev_poll(stack, use_ltran_flag, nic_read_number);
read_recv_list(stack, read_connect_number);
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index 75322d5..a39e372 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -152,9 +152,6 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_
rpc_call_result = rpc_call_recvlistcnt(stack);
dfx->data.pkts.recv_list_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result;
- rpc_call_result = rpc_call_sendlistcnt(stack);
- dfx->data.pkts.send_list_cnt = (rpc_call_result < 0) ? 0 : rpc_call_result;
-
dfx->data.pkts.conn_num = stack->conn_num;
}
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index 08fe20d..fe3b757 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -23,14 +23,6 @@
#include "lstack_dpdk.h"
#include "lstack_thread_rpc.h"
-#define RPC_MSG_MAX 512
-#define RPC_MSG_MASK (RPC_MSG_MAX - 1)
-struct rpc_msg_pool {
- struct rpc_msg msgs[RPC_MSG_MAX];
- uint32_t prod __rte_cache_aligned;
- uint32_t cons __rte_cache_aligned;
-};
-
static PER_THREAD struct rpc_msg_pool *g_rpc_pool = NULL;
static inline __attribute__((always_inline)) struct rpc_msg *get_rpc_msg(struct rpc_msg_pool *rpc_pool)
@@ -76,21 +68,6 @@ static struct rpc_msg *rpc_msg_alloc(struct protocol_stack *stack, rpc_msg_func
return msg;
}
-static inline __attribute__((always_inline)) void rpc_msg_free(struct rpc_msg *msg)
-{
- pthread_spin_destroy(&msg->lock);
-
- msg->self_release = 0;
- msg->func = NULL;
-
- atomic_fetch_add((_Atomic uint32_t *)&msg->pool->cons, 1);
-}
-
-static inline __attribute__((always_inline)) void rpc_call(lockless_queue *queue, struct rpc_msg *msg)
-{
- lockless_queue_mpsc_push(queue, &msg->queue_node);
-}
-
static inline __attribute__((always_inline)) int32_t rpc_sync_call(lockless_queue *queue, struct rpc_msg *msg)
{
int32_t ret;
@@ -124,10 +101,13 @@ void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num)
stack->stats.call_null++;
}
- if (msg->self_release) {
- pthread_spin_unlock(&msg->lock);
- } else {
- rpc_msg_free(msg);
+ /* stack_send free msg in stack_send */
+ if (msg->func != stack_send) {
+ if (msg->self_release) {
+ pthread_spin_unlock(&msg->lock);
+ } else {
+ rpc_msg_free(msg);
+ }
}
}
}
@@ -217,18 +197,6 @@ int32_t rpc_call_mempoolsize(struct protocol_stack *stack)
return rpc_sync_call(&stack->rpc_queue, msg);
}
-int32_t rpc_call_sendlistcnt(struct protocol_stack *stack)
-{
- struct rpc_msg *msg = rpc_msg_alloc(stack, stack_sendlist_count);
- if (msg == NULL) {
- return -1;
- }
-
- msg->args[MSG_ARG_0].p = stack;
-
- return rpc_sync_call(&stack->rpc_queue, msg);
-}
-
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack)
{
struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvlist_count);
diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h
index 5f16c19..2705fee 100644
--- a/src/lstack/include/lstack_cfg.h
+++ b/src/lstack/include/lstack_cfg.h
@@ -76,7 +76,6 @@ struct cfg_params {
uint32_t lpm_pkts_in_detect;
uint32_t tcp_conn_count;
uint32_t mbuf_count_per_conn;
- uint32_t send_connect_number;
uint32_t read_connect_number;
uint32_t rpc_number;
uint32_t nic_read_number;
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index 6f5b4f4..02110e0 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -35,7 +35,6 @@ ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, uint8_t apiflags);
void read_recv_list(struct protocol_stack *stack, uint32_t max_num);
void send_stack_list(struct protocol_stack *stack, uint32_t send_max);
void add_recv_list(int32_t fd);
-void stack_sendlist_count(struct rpc_msg *msg);
void get_lwip_conntable(struct rpc_msg *msg);
void get_lwip_connnum(struct rpc_msg *msg);
void stack_recvlist_count(struct rpc_msg *msg);
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index 795db39..11b001c 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -68,7 +68,6 @@ struct protocol_stack {
struct rte_mbuf *pkts[RTE_TEST_RX_DESC_DEFAULT];
struct list_node recv_list;
- struct list_node send_list;
struct list_node wakeup_list;
volatile uint16_t conn_num;
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index aff30dc..ed111fb 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -24,6 +24,10 @@
#define MSG_ARG_3 (3)
#define MSG_ARG_4 (4)
#define RPM_MSG_ARG_SIZE (5)
+
+#define RPC_MSG_MAX 512
+#define RPC_MSG_MASK (RPC_MSG_MAX - 1)
+
struct rpc_msg;
typedef void (*rpc_msg_func)(struct rpc_msg *msg);
union rpc_msg_arg {
@@ -48,6 +52,12 @@ struct rpc_msg {
union rpc_msg_arg args[RPM_MSG_ARG_SIZE]; /* resolve by type */
};
+struct rpc_msg_pool {
+ struct rpc_msg msgs[RPC_MSG_MAX];
+ uint32_t prod __rte_cache_aligned;
+ uint32_t cons __rte_cache_aligned;
+};
+
struct protocol_stack;
struct rte_mbuf;
struct wakeup_poll;
@@ -57,7 +67,6 @@ void rpc_call_clean_epoll(struct protocol_stack *stack, struct wakeup_poll *wake
int32_t rpc_call_msgcnt(struct protocol_stack *stack);
int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack);
-int32_t rpc_call_sendlistcnt(struct protocol_stack *stack);
int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn);
int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn);
int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn);
@@ -79,4 +88,18 @@ int32_t rpc_call_ioctl(int fd, long cmd, void *argp);
int32_t rpc_call_replenish(struct protocol_stack *stack, struct lwip_sock *sock);
int32_t rpc_call_mempoolsize(struct protocol_stack *stack);
+static inline __attribute__((always_inline)) void rpc_call(lockless_queue *queue, struct rpc_msg *msg)
+{
+ lockless_queue_mpsc_push(queue, &msg->queue_node);
+}
+
+static inline __attribute__((always_inline)) void rpc_msg_free(struct rpc_msg *msg)
+{
+ pthread_spin_destroy(&msg->lock);
+
+ msg->self_release = 0;
+
+ __atomic_fetch_add((_Atomic uint32_t *)&msg->pool->cons, 1, __ATOMIC_SEQ_CST);
+}
+
#endif
diff --git a/src/lstack/lstack.conf b/src/lstack/lstack.conf
index a4571ff..cf81954 100644
--- a/src/lstack/lstack.conf
+++ b/src/lstack/lstack.conf
@@ -28,8 +28,6 @@ send_ring_size = 256
expand_send_ring = 0
#protocol stack thread per loop params
-#send connect to nic
-send_connect_number = 4
#read data form protocol stack into recv_ring
read_connect_number = 4
#process rpc msg number
diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c
index 1c9d4fa..051787e 100644
--- a/src/ltran/ltran_dfx.c
+++ b/src/ltran/ltran_dfx.c
@@ -572,7 +572,6 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat)
printf("write_lwip: %-17"PRIu64" ", lstack_stat->data.pkts.stack_stat.write_lwip_cnt);
printf("app_write_rpc: %-14"PRIu64" \n", lstack_stat->data.pkts.wakeup_stat.app_write_rpc);
printf("recv_list: %-18"PRIu64" ", lstack_stat->data.pkts.recv_list_cnt);
- printf("send_list: %-18"PRIu64" ", lstack_stat->data.pkts.send_list_cnt);
printf("conn_num: %-19hu \n", lstack_stat->data.pkts.conn_num);
printf("wakeup_events: %-14"PRIu64" ", lstack_stat->data.pkts.stack_stat.wakeup_events);
printf("app_events: %-17"PRIu64" ", lstack_stat->data.pkts.wakeup_stat.app_events);
--
2.23.0
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/wu-changsheng/gazelle-tar.git
git@gitee.com:wu-changsheng/gazelle-tar.git
wu-changsheng
gazelle-tar
gazelle-tar
master

搜索帮助