代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/gazelle 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From fadeb43a653ab5da503b7030b60b4e063f0b3aef Mon Sep 17 00:00:00 2001
From: wuchangsheng <wuchangsheng2@huawei.com>
Date: Sun, 13 Mar 2022 22:57:44 +0800
Subject: [PATCH 24/34] refactor event
---
src/common/gazelle_dfx_msg.h | 10 +++-
src/lstack/api/lstack_epoll.c | 73 +++++++++++++++++---------
src/lstack/core/lstack_control_plane.c | 13 +++++
src/lstack/core/lstack_lwip.c | 76 ++++++++++++++++++++-------
src/lstack/core/lstack_protocol_stack.c | 36 ++++++++++---
src/lstack/core/lstack_stack_stat.c | 4 ++
src/lstack/core/lstack_thread_rpc.c | 42 +++++++++++++++
src/lstack/include/lstack_lwip.h | 3 ++
src/lstack/include/lstack_protocol_stack.h | 2 +
src/lstack/include/lstack_thread_rpc.h | 3 ++
src/lstack/include/lstack_weakup.h | 84 +++++++++++++++++++++---------
src/ltran/ltran_dfx.c | 36 ++++++++-----
12 files changed, 290 insertions(+), 92 deletions(-)
diff --git a/src/common/gazelle_dfx_msg.h b/src/common/gazelle_dfx_msg.h
index cea4200..ae20436 100644
--- a/src/common/gazelle_dfx_msg.h
+++ b/src/common/gazelle_dfx_msg.h
@@ -64,10 +64,12 @@ struct gazelle_stat_pkts {
uint64_t rx_drop;
uint64_t rx_allocmbuf_fail;
uint64_t tx_allocmbuf_fail;
- uint16_t weakup_ring_cnt;
uint64_t call_msg_cnt;
+ uint16_t weakup_ring_cnt;
uint16_t conn_num;
uint16_t send_idle_ring_cnt;
+ uint64_t event_list;
+ uint64_t wakeup_list;
uint64_t read_lwip_drop;
uint64_t read_lwip_cnt;
uint64_t write_lwip_drop;
@@ -89,6 +91,10 @@ struct gazelle_stat_pkts {
uint64_t send_self_rpc;
uint64_t call_null;
uint64_t arp_copy_fail;
+ uint64_t epoll_pending;
+ uint64_t epoll_pending_call;
+ uint64_t epoll_self_call;
+ uint64_t epoll_self_event;
};
/* same as define in lwip/stats.h - struct stats_mib2 */
@@ -162,6 +168,8 @@ struct gazelle_stat_lstack_conn_info {
uint32_t send_ring_cnt;
uint32_t recv_ring_cnt;
uint32_t tcp_sub_state;
+ uint32_t event_ring_cnt;
+ uint32_t self_ring_cnt;
};
struct gazelle_stat_lstack_conn {
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
index bcbb35e..a686ddb 100644
--- a/src/lstack/api/lstack_epoll.c
+++ b/src/lstack/api/lstack_epoll.c
@@ -87,19 +87,27 @@ void add_epoll_event(struct netconn *conn, uint32_t event)
return;
}
- sock->have_event = true;
- weakup_enqueue(sock->stack->weakup_ring, sock);
- sock->stack->stats.weakup_events++;
+ if (weakup_enqueue(sock->stack->weakup_ring, sock)) {
+ if (list_is_empty(&sock->event_list)) {
+ list_add_node(&sock->stack->event_list, &sock->event_list);
+ }
+ } else {
+ sock->have_event = true;
+ sock->stack->stats.weakup_events++;
+ }
}
static void raise_pending_events(struct lwip_sock *sock)
{
- if (!sock->conn) {
+ struct weakup_poll *wakeup = sock->weakup;
+ struct protocol_stack *stack = sock->stack;
+ struct netconn *conn = sock->conn;
+ if (wakeup == NULL || stack == NULL || conn == NULL) {
return;
}
struct lwip_sock *attach_sock = NULL;
- if (sock->attach_fd > 0 && sock->attach_fd != sock->conn->socket) {
+ if (sock->attach_fd > 0 && sock->attach_fd != conn->socket) {
attach_sock = get_socket_by_fd(sock->attach_fd);
if (attach_sock == NULL) {
return;
@@ -108,7 +116,10 @@ static void raise_pending_events(struct lwip_sock *sock)
attach_sock = sock;
}
- struct netconn *conn = attach_sock->conn;
+ conn = attach_sock->conn;
+ if (conn == NULL) {
+ return;
+ }
struct tcp_pcb *tcp = conn->pcb.tcp;
if ((tcp == NULL) || (tcp->state < ESTABLISHED)) {
return;
@@ -132,10 +143,17 @@ static void raise_pending_events(struct lwip_sock *sock)
event |= POLLERR | POLLIN;
}
- if (event != 0) {
- sock->events |= event;
- rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock);
- sem_post(&sock->weakup->event_sem);
+ if (event == 0) {
+ return;
+ }
+ sock->events |= event;
+ if (rte_ring_mp_enqueue(wakeup->event_ring, (void *)sock) == 0 ||
+ rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) {
+ sem_post(&wakeup->event_sem);
+ stack->stats.epoll_pending++;
+ } else {
+ rpc_call_addevent(stack, sock);
+ stack->stats.epoll_pending_call++;
}
}
@@ -168,6 +186,12 @@ int32_t lstack_epoll_create(int32_t size)
GAZELLE_RETURN(ENOMEM);
}
+ weakup->self_ring = create_ring("SELF_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, fd);
+ if (weakup->self_ring == NULL) {
+ posix_api->close_fn(fd);
+ GAZELLE_RETURN(ENOMEM);
+ }
+
sock->weakup = weakup;
return fd;
@@ -247,11 +271,6 @@ static inline int32_t save_poll_event(struct pollfd *fds, uint32_t maxevents, st
static bool remove_event(enum POLL_TYPE etype, struct lwip_sock **sock_list, int32_t event_num, struct lwip_sock *sock)
{
- /* close sock */
- if (sock->stack == NULL) {
- return true;
- }
-
/* remove duplicate event */
for (uint32_t i = 0; i < event_num && etype == TYPE_EPOLL; i++) {
if (sock_list[i] == sock) {
@@ -267,29 +286,26 @@ static int32_t get_lwip_events(struct weakup_poll *weakup, void *out, uint32_t m
struct epoll_event *events = (struct epoll_event *)out;
struct pollfd *fds = (struct pollfd *)out;
- uint32_t events_cnt = rte_ring_count(weakup->event_ring);
- if (events_cnt == 0) {
- return 0;
- }
if (etype == TYPE_EPOLL) {
maxevents = LWIP_MIN(EPOLL_MAX_EVENTS, maxevents);
}
- events_cnt = LWIP_MIN(events_cnt, maxevents);
int32_t event_num = 0;
struct lwip_sock *sock = NULL;
- while (event_num < events_cnt) {
- int32_t ret = rte_ring_sc_dequeue(weakup->event_ring, (void **)&sock);
- if (ret != 0) {
+ while (event_num < maxevents) {
+ if (rte_ring_sc_dequeue(weakup->self_ring, (void **)&sock) &&
+ rte_ring_sc_dequeue(weakup->event_ring, (void **)&sock)) {
break;
}
+ /* close sock */
+ if (sock->stack == NULL) {
+ return true;
+ }
sock->have_event = false;
if (remove_event(etype, weakup->sock_list, event_num, sock)) {
- if (sock->stack) {
- sock->stack->stats.remove_event++;
- }
+ sock->stack->stats.remove_event++;
continue;
}
@@ -390,6 +406,11 @@ static int32_t poll_init(struct pollfd *fds, nfds_t nfds, struct weakup_poll *we
if (weakup->event_ring == NULL) {
GAZELLE_RETURN(ENOMEM);
}
+
+ weakup->self_ring = create_ring("SELF_EVENT", VDEV_EVENT_QUEUE_SZ, RING_F_SC_DEQ, rte_gettid());
+ if (weakup->self_ring == NULL) {
+ GAZELLE_RETURN(ENOMEM);
+ }
}
for (uint32_t i = 0; i < nfds; i++) {
diff --git a/src/lstack/core/lstack_control_plane.c b/src/lstack/core/lstack_control_plane.c
index a7e084d..c782d51 100644
--- a/src/lstack/core/lstack_control_plane.c
+++ b/src/lstack/core/lstack_control_plane.c
@@ -38,6 +38,7 @@
#define RECONNECT_TO_LTRAN_DELAY (1)
#define GAZELLE_BADFD (-1)
#define GAZELLE_LISTEN_BACKLOG 5
+#define GAZELLE_10MS (10000)
static int32_t g_data_fd = -1;
static volatile bool g_register_state = true;
@@ -701,6 +702,12 @@ void control_server_thread(void *arg)
int32_t num, connfd;
struct epoll_event evt_array;
while (1) {
+ /* wait init finish */
+ if (posix_api->is_chld) {
+ usleep(GAZELLE_10MS);
+ continue;
+ }
+
num = posix_api->epoll_wait_fn(epfd, &evt_array, 1, -1);
if (num <= 0) {
continue;
@@ -741,6 +748,12 @@ void control_client_thread(void *arg)
}
while (1) {
+ /* wait init finish */
+ if (posix_api->is_chld) {
+ usleep(GAZELLE_10MS);
+ continue;
+ }
+
if (sockfd < 0) {
set_register_state(false);
sockfd = client_reg_proc_reconnect(epfd);
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index b157517..d55f1e6 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -110,6 +110,8 @@ void gazelle_init_sock(int32_t fd)
init_list_node(&sock->recv_list);
init_list_node(&sock->attach_list);
init_list_node(&sock->listen_list);
+ init_list_node(&sock->event_list);
+ init_list_node(&sock->wakeup_list);
}
void gazelle_clean_sock(int32_t fd)
@@ -126,6 +128,8 @@ void gazelle_clean_sock(int32_t fd)
list_del_node_init(&sock->recv_list);
list_del_node_init(&sock->attach_list);
list_del_node_init(&sock->listen_list);
+ list_del_node_init(&sock->event_list);
+ list_del_node_init(&sock->wakeup_list);
}
void gazelle_free_pbuf(struct pbuf *pbuf)
@@ -266,6 +270,30 @@ ssize_t write_lwip_data(struct lwip_sock *sock, int32_t fd, int32_t flags)
return (send_ret < 0) ? send_ret : send_len;
}
+void add_self_event(struct lwip_sock *sock, uint32_t events)
+{
+ struct weakup_poll *wakeup = sock->weakup;
+ struct protocol_stack *stack = sock->stack;
+ if (wakeup == NULL || stack == NULL) {
+ return;
+ }
+
+ sock->events |= events;
+
+ if (sock->have_event) {
+ return;
+ }
+
+ if (rte_ring_mp_enqueue(wakeup->self_ring, (void *)sock) == 0) {
+ sock->have_event = true;
+ sem_post(&sock->weakup->event_sem);
+ stack->stats.epoll_self_event++;
+ } else {
+ rpc_call_addevent(stack, sock);
+ stack->stats.epoll_self_call++;
+ }
+}
+
ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
{
uint32_t free_count = rte_ring_free_count(sock->send_ring);
@@ -303,14 +331,10 @@ ssize_t write_stack_data(struct lwip_sock *sock, const void *buf, size_t len)
send_pkt++;
}
- if (!sock->have_event && (sock->epoll_events & EPOLLOUT) && NETCONN_IS_DATAOUT(sock)) {
- sock->have_event = true;
- sock->events |= EPOLLOUT;
- rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock);
- sem_post(&sock->weakup->event_sem);
+ if ((sock->epoll_events & EPOLLOUT) && NETCONN_IS_DATAOUT(sock)) {
+ add_self_event(sock, EPOLLOUT);
sock->stack->stats.write_events++;
- }
- if (!NETCONN_IS_DATAOUT(sock)) {
+ } else {
sock->events &= ~EPOLLOUT;
}
@@ -507,14 +531,10 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags)
}
}
- if (!sock->have_event && (sock->epoll_events & EPOLLIN) && NETCONN_IS_DATAIN(sock)) {
- sock->have_event = true;
- sock->events |= EPOLLIN;
- rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock);
- sem_post(&sock->weakup->event_sem);
+ if ((sock->epoll_events & EPOLLIN) && NETCONN_IS_DATAIN(sock)) {
+ add_self_event(sock, EPOLLIN);
sock->stack->stats.read_events++;
- }
- if (!NETCONN_IS_DATAIN(sock)) {
+ } else {
sock->events &= ~EPOLLIN;
}
@@ -577,9 +597,14 @@ static void copy_pcb_to_conn(struct gazelle_stat_lstack_conn_info *conn, const s
conn->recv_cnt = rte_ring_count(netconn->recvmbox->ring);
struct lwip_sock *sock = get_socket(netconn->socket);
- if (sock != NULL && sock->recv_ring != NULL && sock->send_ring != NULL) {
+ if (netconn->socket > 0 && sock != NULL && sock->recv_ring != NULL && sock->send_ring != NULL) {
conn->recv_ring_cnt = rte_ring_count(sock->recv_ring);
conn->send_ring_cnt = rte_ring_count(sock->send_ring);
+ struct weakup_poll *weakup = sock->weakup;
+ if (weakup) {
+ conn->event_ring_cnt = rte_ring_count(weakup->event_ring);
+ conn->self_ring_cnt = rte_ring_count(weakup->self_ring);
+ }
}
}
}
@@ -696,10 +721,8 @@ void get_lwip_connnum(struct rpc_msg *msg)
msg->result = conn_num;
}
-void stack_recvlist_count(struct rpc_msg *msg)
+static uint32_t get_list_count(struct list_node *list)
{
- struct protocol_stack *stack = get_protocol_stack();
- struct list_node *list = &(stack->recv_list);
struct list_node *node, *temp;
uint32_t count = 0;
@@ -707,5 +730,20 @@ void stack_recvlist_count(struct rpc_msg *msg)
count++;
}
- msg->result = count;
+ return count;
+}
+
+void stack_wakeuplist_count(struct rpc_msg *msg)
+{
+ msg->result = get_list_count(get_protocol_stack()->wakeup_list);
+}
+
+void stack_eventlist_count(struct rpc_msg *msg)
+{
+ msg->result = get_list_count(&get_protocol_stack()->event_list);
+}
+
+void stack_recvlist_count(struct rpc_msg *msg)
+{
+ msg->result = get_list_count(&get_protocol_stack()->recv_list);
}
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index c88f902..45649fe 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -193,6 +193,7 @@ int32_t init_protocol_stack(void)
init_list_node(&stack->recv_list);
init_list_node(&stack->listen_list);
+ init_list_node(&stack->event_list);
stack_group->stacks[i] = stack;
}
@@ -261,8 +262,14 @@ static void* gazelle_weakup_thread(void *arg)
thread_affinity_init(lcore_id);
LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id);
+ struct list_node wakeup_list;
+ init_list_node(&wakeup_list);
+ stack->wakeup_list = &wakeup_list;
+
for (;;) {
- weakup_thread(stack->weakup_ring);
+ wakeup_list_sock(&wakeup_list);
+
+ weakup_thread(stack->weakup_ring, &wakeup_list);
}
return NULL;
@@ -307,6 +314,24 @@ static void stack_thread_init(struct protocol_stack *stack)
LSTACK_LOG(INFO, LSTACK, "stack_%02d init success\n", queue_id);
}
+static void report_stack_event(struct protocol_stack *stack)
+{
+ struct list_node *list = &(stack->event_list);
+ struct list_node *node, *temp;
+ struct lwip_sock *sock;
+
+ list_for_each_safe(node, temp, list) {
+ sock = container_of(node, struct lwip_sock, event_list);
+
+ if (weakup_enqueue(stack->weakup_ring, sock) == 0) {
+ list_del_node_init(&sock->event_list);
+ stack->stats.weakup_events++;
+ } else {
+ break;
+ }
+ }
+}
+
static void* gazelle_stack_thread(void *arg)
{
struct protocol_stack *stack = (struct protocol_stack *)arg;
@@ -321,6 +346,8 @@ static void* gazelle_stack_thread(void *arg)
read_recv_list();
sys_timer_run();
+
+ report_stack_event(stack);
}
return NULL;
@@ -737,11 +764,8 @@ int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *add
}
struct lwip_sock *sock = get_socket(head_fd);
- if (!sock->have_event && have_accept_event(head_fd)) {
- sock->have_event = true;
- sock->events |= EPOLLIN;
- rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock);
- sem_post(&sock->weakup->event_sem);
+ if (have_accept_event(head_fd)) {
+ add_self_event(sock, EPOLLIN);
sock->stack->stats.accept_events++;
}
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index 41fe9bf..b7b94e2 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -109,6 +109,10 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_
dfx->data.pkts.send_idle_ring_cnt = rte_ring_count(stack->send_idle_ring);
dfx->data.pkts.call_msg_cnt = rpc_call_msgcnt(stack);
dfx->data.pkts.recv_list = rpc_call_recvlistcnt(stack);
+ dfx->data.pkts.event_list = rpc_call_eventlistcnt(stack);
+ if (stack->wakeup_list) {
+ dfx->data.pkts.wakeup_list = rpc_call_eventlistcnt(stack);
+ }
dfx->data.pkts.conn_num = stack->conn_num;
}
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index b3665a7..2fb24b4 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -177,6 +177,26 @@ int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn)
return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
}
+int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(stack, stack_wakeuplist_count);
+ if (msg == NULL) {
+ return -1;
+ }
+
+ return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+}
+
+int32_t rpc_call_eventlistcnt(struct protocol_stack *stack)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(stack, stack_eventlist_count);
+ if (msg == NULL) {
+ return -1;
+ }
+
+ return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
+}
+
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack)
{
struct rpc_msg *msg = rpc_msg_alloc(stack, stack_recvlist_count);
@@ -187,6 +207,28 @@ int32_t rpc_call_recvlistcnt(struct protocol_stack *stack)
return rpc_sync_call(&stack->rpc_queue, stack->rpc_pool, msg);
}
+void add_epoll_event(struct netconn *conn, uint32_t event);
+static void rpc_add_event(struct rpc_msg *msg)
+{
+ struct lwip_sock *sock = (struct lwip_sock *)msg->args[MSG_ARG_0].p;
+ if (sock->conn) {
+ add_epoll_event(sock->conn, sock->events);
+ }
+}
+
+void rpc_call_addevent(struct protocol_stack *stack, void *sock)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(stack, rpc_add_event);
+ if (msg == NULL) {
+ return;
+ }
+
+ msg->args[MSG_ARG_0].p = sock;
+
+ msg->self_release = 0;
+ rpc_call(&stack->rpc_queue, msg);
+}
+
static void rpc_replenish_idlembuf(struct rpc_msg *msg)
{
struct protocol_stack *stack = get_protocol_stack();
diff --git a/src/lstack/include/lstack_lwip.h b/src/lstack/include/lstack_lwip.h
index 581b9fe..87442cd 100644
--- a/src/lstack/include/lstack_lwip.h
+++ b/src/lstack/include/lstack_lwip.h
@@ -33,6 +33,8 @@ ssize_t read_stack_data(int32_t fd, void *buf, size_t len, int32_t flags);
ssize_t read_lwip_data(struct lwip_sock *sock, int32_t flags, u8_t apiflags);
void read_recv_list(void);
void add_recv_list(int32_t fd);
+void stack_eventlist_count(struct rpc_msg *msg);
+void stack_wakeuplist_count(struct rpc_msg *msg);
void get_lwip_conntable(struct rpc_msg *msg);
void get_lwip_connnum(struct rpc_msg *msg);
void stack_recvlist_count(struct rpc_msg *msg);
@@ -42,5 +44,6 @@ void gazelle_free_pbuf(struct pbuf *pbuf);
ssize_t sendmsg_to_stack(int32_t s, const struct msghdr *message, int32_t flags);
ssize_t recvmsg_from_stack(int32_t s, struct msghdr *message, int32_t flags);
ssize_t gazelle_send(int32_t fd, const void *buf, size_t len, int32_t flags);
+void add_self_event(struct lwip_sock *sock, uint32_t events);
#endif
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index f289465..dd7633b 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -51,6 +51,8 @@ struct protocol_stack {
struct list_node recv_list;
struct list_node listen_list;
+ struct list_node event_list;
+ struct list_node *wakeup_list;
struct gazelle_stat_pkts stats;
struct gazelle_stack_latency latency;
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index 1365234..cffb273 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -50,9 +50,12 @@ struct rpc_msg {
struct protocol_stack;
void poll_rpc_msg(struct protocol_stack *stack);
void rpc_call_replenish_idlembuf(struct protocol_stack *stack);
+void rpc_call_addevent(struct protocol_stack *stack, void *sock);
int32_t rpc_call_msgcnt(struct protocol_stack *stack);
int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack);
+int32_t rpc_call_eventlistcnt(struct protocol_stack *stack);
+int32_t rpc_call_wakeuplistcnt(struct protocol_stack *stack);
int32_t rpc_call_thread_regphase1(struct protocol_stack *stack, void *conn);
int32_t rpc_call_thread_regphase2(struct protocol_stack *stack, void *conn);
int32_t rpc_call_conntable(struct protocol_stack *stack, void *conn_table, uint32_t max_conn);
diff --git a/src/lstack/include/lstack_weakup.h b/src/lstack/include/lstack_weakup.h
index f334a0f..8f7fca2 100644
--- a/src/lstack/include/lstack_weakup.h
+++ b/src/lstack/include/lstack_weakup.h
@@ -22,55 +22,93 @@ struct weakup_poll {
sem_t event_sem;
struct lwip_sock *sock_list[EPOLL_MAX_EVENTS];
struct rte_ring *event_ring;
+ struct rte_ring *self_ring;
};
#define WEAKUP_MAX (32)
-static inline __attribute__((always_inline)) void weakup_attach_sock(struct lwip_sock *sock)
+static inline void wakeup_list_sock(struct list_node *wakeup_list)
{
- struct list_node *list = &(sock->attach_list);
struct list_node *node, *temp;
- struct lwip_sock *attach_sock;
- int32_t ret;
- list_for_each_safe(node, temp, list) {
- attach_sock = container_of(node, struct lwip_sock, attach_list);
- if (attach_sock->weakup == NULL) {
+ list_for_each_safe(node, temp, wakeup_list) {
+ struct lwip_sock *sock = container_of(node, struct lwip_sock, wakeup_list);
+
+ struct weakup_poll *weakup = sock->weakup;
+ struct protocol_stack *stack = sock->stack;
+ if (weakup == NULL || stack == NULL) {
continue;
}
- ret = rte_ring_mp_enqueue(attach_sock->weakup->event_ring, (void *)attach_sock);
+ int32_t ret = rte_ring_mp_enqueue(weakup->event_ring, (void *)sock);
if (ret == 0) {
- sem_post(&attach_sock->weakup->event_sem);
- attach_sock->stack->stats.lwip_events++;
+ list_del_node_init(&sock->event_list);
+ sem_post(&weakup->event_sem);
+ stack->stats.lwip_events++;
+ } else {
+ break;
}
}
}
-static inline __attribute__((always_inline)) void weakup_thread(struct rte_ring *weakup_ring)
+static inline int32_t weakup_attach_sock(struct list_node *attach_list)
+{
+ struct list_node *node, *temp;
+ int32_t wakeuped = -1;
+
+ list_for_each_safe(node, temp, attach_list) {
+ struct lwip_sock *sock = container_of(node, struct lwip_sock, attach_list);
+
+ struct weakup_poll *weakup = sock->weakup;
+ struct protocol_stack *stack = sock->stack;
+ if (weakup == NULL || stack == NULL) {
+ continue;
+ }
+
+ int32_t ret = rte_ring_mp_enqueue(weakup->event_ring, (void *)sock);
+ if (ret == 0) {
+ sem_post(&weakup->event_sem);
+ stack->stats.lwip_events++;
+ wakeuped = 0;
+ }
+ }
+
+ return wakeuped;
+}
+
+static inline void weakup_thread(struct rte_ring *weakup_ring, struct list_node *wakeup_list)
{
struct lwip_sock *sock;
- int32_t ret;
for (uint32_t i = 0; i < WEAKUP_MAX; ++i) {
- ret = rte_ring_sc_dequeue(weakup_ring, (void **)&sock);
+ int32_t ret = rte_ring_sc_dequeue(weakup_ring, (void **)&sock);
if (ret != 0) {
break;
}
- ret = rte_ring_mp_enqueue(sock->weakup->event_ring, (void *)sock);
+ struct weakup_poll *weakup = sock->weakup;
+ struct protocol_stack *stack = sock->stack;
+ if (weakup == NULL || stack == NULL) {
+ continue;
+ }
+
+ ret = rte_ring_mp_enqueue(weakup->event_ring, (void *)sock);
if (ret == 0) {
- sem_post(&sock->weakup->event_sem);
- sock->stack->stats.lwip_events++;
+ sem_post(&weakup->event_sem);
+ stack->stats.lwip_events++;
}
/* listen notice attach sock */
+ int32_t wakeuped = -1;
if (!list_is_empty(&sock->attach_list)) {
- weakup_attach_sock(sock);
+ wakeuped = weakup_attach_sock(&sock->attach_list);
}
- /* event_ring of attach sock may have idle elem */
- if (ret != 0) {
+ /* notice any epoll enough */
+ if (ret != 0 && wakeuped != 0) {
+ if (list_is_empty(&sock->wakeup_list)) {
+ list_add_node(wakeup_list, &sock->wakeup_list);
+ }
break;
}
}
@@ -79,13 +117,7 @@ static inline __attribute__((always_inline)) void weakup_thread(struct rte_ring
static inline __attribute__((always_inline))
int weakup_enqueue(struct rte_ring *weakup_ring, struct lwip_sock *sock)
{
- int ret = rte_ring_sp_enqueue(weakup_ring, (void *)sock);
- if (ret < 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %d, failed\n", gettid());
- return -1;
- }
-
- return 0;
+ return rte_ring_sp_enqueue(weakup_ring, (void *)sock);
}
#endif
diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c
index 451f527..66d6053 100644
--- a/src/ltran/ltran_dfx.c
+++ b/src/ltran/ltran_dfx.c
@@ -567,14 +567,20 @@ static void show_lstack_stats(struct gazelle_stack_dfx_data *lstack_stat)
printf("weakup_events: %-14"PRIu64" ", lstack_stat->data.pkts.weakup_events);
printf("lwip_events: %-16"PRIu64" ", lstack_stat->data.pkts.lwip_events);
printf("app_events: %-17"PRIu64"\n", lstack_stat->data.pkts.app_events);
+ printf("epoll_pending: %-14"PRIu64" ", lstack_stat->data.pkts.epoll_pending);
+ printf("epoll_self_event: %-11"PRIu64" ", lstack_stat->data.pkts.epoll_self_event);
+ printf("remove_event: %-15"PRIu64" \n", lstack_stat->data.pkts.remove_event);
printf("read_events: %-16"PRIu64" ", lstack_stat->data.pkts.read_events);
printf("write_events: %-15"PRIu64" ", lstack_stat->data.pkts.write_events);
printf("accept_events: %-14"PRIu64" \n", lstack_stat->data.pkts.accept_events);
- printf("call_msg: %-19"PRIu64" ", lstack_stat->data.pkts.call_msg_cnt);
printf("read_null: %-18"PRIu64" ", lstack_stat->data.pkts.read_null);
- printf("call_alloc_fail: %-12"PRIu64" \n", lstack_stat->data.pkts.call_alloc_fail);
- printf("remove_event: %-15"PRIu64" ", lstack_stat->data.pkts.remove_event);
+ printf("wakeup_list: %-16"PRIu64" ", lstack_stat->data.pkts.wakeup_list);
+ printf("event_list: %-17"PRIu64" \n", lstack_stat->data.pkts.event_list);
printf("send_self_rpc: %-14"PRIu64" ", lstack_stat->data.pkts.send_self_rpc);
+ printf("epoll_pending_call: %-9"PRIu64" ", lstack_stat->data.pkts.epoll_pending_call);
+ printf("epoll_self_call: %-12"PRIu64" \n", lstack_stat->data.pkts.epoll_self_call);
+ printf("call_msg: %-19"PRIu64" ", lstack_stat->data.pkts.call_msg_cnt);
+ printf("call_alloc_fail: %-12"PRIu64" ", lstack_stat->data.pkts.call_alloc_fail);
printf("call_null: %-18"PRIu64" \n", lstack_stat->data.pkts.call_null);
}
@@ -866,8 +872,7 @@ static void gazelle_print_lstack_stat_snmp(void *buf, const struct gazelle_stat_
static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_msg_request *req_msg)
{
- int32_t ret;
- uint32_t i, unread_pkts;
+ uint32_t i;
struct in_addr rip;
struct in_addr lip;
char str_ip[GAZELLE_SUBNET_LENGTH_MAX] = {0};
@@ -878,30 +883,33 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_
printf("Active Internet connections (servers and established)\n");
do {
printf("\n------ stack tid: %6u ------\n", stat->tid);
- printf("No. Proto recv_cnt recv_ring in_send send_ring Local Address"
+ printf("No. Proto recv_cnt recv_ring in_send send_ring event self_event Local Address"
" Foreign Address State\n");
- unread_pkts = 0;
+ uint32_t unread_pkts = 0;
+ uint32_t unsend_pkts = 0;
for (i = 0; i < conn->conn_num && i < GAZELLE_LSTACK_MAX_CONN; i++) {
struct gazelle_stat_lstack_conn_info *conn_info = &conn->conn_list[i];
rip.s_addr = conn_info->rip;
lip.s_addr = conn_info->lip;
if ((conn_info->state == GAZELLE_ACTIVE_LIST) || (conn_info->state == GAZELLE_TIME_WAIT_LIST)) {
- printf("%-6utcp %-10u%-11u%-9u%-11u%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt,
- conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt,
- inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, inet_ntop(AF_INET, &rip,
- str_rip, sizeof(str_rip)), conn_info->r_port, tcp_state_to_str(conn_info->tcp_sub_state));
+ printf("%-6utcp %-10u%-11u%-9u%-11u%-7u%-12u%s:%hu\t%s:%hu\t%s\n", i, conn_info->recv_cnt,
+ conn_info->recv_ring_cnt, conn_info->in_send, conn_info->send_ring_cnt, conn_info->event_ring_cnt,
+ conn_info->self_ring_cnt, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port,
+ inet_ntop(AF_INET, &rip, str_rip, sizeof(str_rip)), conn_info->r_port,
+ tcp_state_to_str(conn_info->tcp_sub_state));
} else if (conn_info->state == GAZELLE_LISTEN_LIST) {
- printf("%-6utcp %-41u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt,
+ printf("%-6utcp %-60u%s:%hu\t0.0.0.0:*\t\tLISTEN\n", i, conn_info->recv_cnt,
inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port);
} else {
printf("Got unknow tcp conn::%s:%5hu, state:%u\n",
inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, conn_info->state);
}
unread_pkts += conn_info->recv_ring_cnt;
+ unsend_pkts += conn_info->send_ring_cnt;
}
if (conn->conn_num > 0) {
- printf("Total unread pkts: %u \n", unread_pkts);
+ printf("Total unread pkts:%u unsend pkts:%u\n", unread_pkts, unsend_pkts);
}
if (i < conn->total_conn_num) {
@@ -912,7 +920,7 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_
if (stat->eof != 0) {
break;
}
- ret = dfx_stat_read_from_ltran(buf, sizeof(struct gazelle_stack_dfx_data), req_msg->stat_mode);
+ int32_t ret = dfx_stat_read_from_ltran(buf, sizeof(struct gazelle_stack_dfx_data), req_msg->stat_mode);
if (ret != GAZELLE_OK) {
return;
}
--
1.8.3.1
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。