1 Star 0 Fork 32

misaka00251/gazelle

forked from src-openEuler/gazelle 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
0111-refactor-event-notice.patch 43.90 KB
一键复制 编辑 原始数据 按行查看 历史
吴昌盛 提交于 2022-10-08 22:27 . adapt ceph client
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243
From b41e22d7d8af5c990edc04130d885c9bd675461e Mon Sep 17 00:00:00 2001
From: chengyechun <chengyechun1@huawei.com>
Date: Thu, 20 Oct 2022 11:29:16 +0800
Subject: [PATCH 2/2] refactor event notice
---
src/common/gazelle_opt.h | 1 +
src/lstack/api/lstack_epoll.c | 382 +++++++++++++--------
src/lstack/api/lstack_wrap.c | 9 +
src/lstack/core/lstack_lwip.c | 4 +-
src/lstack/core/lstack_protocol_stack.c | 148 +++++---
src/lstack/core/lstack_stack_stat.c | 67 +---
src/lstack/core/lstack_thread_rpc.c | 15 +-
src/lstack/include/lstack_protocol_stack.h | 16 +-
src/lstack/include/lstack_stack_stat.h | 1 -
src/lstack/include/lstack_thread_rpc.h | 2 +
src/lstack/include/posix/lstack_epoll.h | 15 +-
src/lstack/netif/lstack_ethdev.c | 2 +-
src/ltran/ltran_dfx.c | 4 +-
13 files changed, 410 insertions(+), 256 deletions(-)
diff --git a/src/common/gazelle_opt.h b/src/common/gazelle_opt.h
index f2ec163..011553c 100644
--- a/src/common/gazelle_opt.h
+++ b/src/common/gazelle_opt.h
@@ -24,6 +24,7 @@
#define GAZELLE_FALSE 0
#define PROTOCOL_STACK_MAX 32
+#define KERNEL_EPOLL_MAX 512
#define ETHER_ADDR_LEN 6
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
index 1206e75..417d69c 100644
--- a/src/lstack/api/lstack_epoll.c
+++ b/src/lstack/api/lstack_epoll.c
@@ -40,36 +40,54 @@
#define SEC_TO_NSEC 1000000000
#define SEC_TO_MSEC 1000
#define MSEC_TO_NSEC 1000000
-#define POLL_KERNEL_EVENTS 128
+#define POLL_KERNEL_EVENTS 32
+
+static void update_epoll_max_stack(struct wakeup_poll *wakeup);
+static void change_epollfd_kernel_thread(struct wakeup_poll *wakeup, struct protocol_stack *old_stack,
+ struct protocol_stack *new_stack);
void add_sock_event(struct lwip_sock *sock, uint32_t event)
{
struct wakeup_poll *wakeup = sock->wakeup;
- if (wakeup == NULL || (event & sock->epoll_events) == 0) {
+ if (wakeup == NULL || wakeup->type == WAKEUP_CLOSE || (event & sock->epoll_events) == 0) {
return;
}
- wakeup->have_event = true;
- sock->stack->have_event = true;
-
- if (wakeup->type == WAKEUP_POLL) {
- return;
+ if (wakeup->type == WAKEUP_EPOLL) {
+ pthread_spin_lock(&wakeup->event_list_lock);
+ sock->events |= (event == EPOLLERR) ? (EPOLLIN | EPOLLERR) : (event & sock->epoll_events);
+ if (list_is_null(&sock->event_list)) {
+ list_add_node(&wakeup->event_list, &sock->event_list);
+ }
+ pthread_spin_unlock(&wakeup->event_list_lock);
}
- pthread_spin_lock(&wakeup->event_list_lock);
- sock->events |= (event == EPOLLERR) ? (EPOLLIN | EPOLLERR) : (event & sock->epoll_events);
- if (list_is_null(&sock->event_list)) {
- list_add_node(&wakeup->event_list, &sock->event_list);
+ struct protocol_stack *stack = sock->stack;
+ if (list_is_null(&wakeup->wakeup_list[stack->queue_id])) {
+ list_add_node(&stack->wakeup_list, &wakeup->wakeup_list[stack->queue_id]);
}
- pthread_spin_unlock(&wakeup->event_list_lock);
}
-void wakeup_epoll(struct protocol_stack *stack, struct wakeup_poll *wakeup)
+void wakeup_stack_epoll(struct protocol_stack *stack)
{
- if (__atomic_load_n(&wakeup->in_wait, __ATOMIC_ACQUIRE)) {
- uint64_t tmp = 1;
- posix_api->write_fn(wakeup->eventfd, &tmp, sizeof(tmp));
- stack->stats.wakeup_events++;
+ struct list_node *node, *temp;
+
+ list_for_each_safe(node, temp, &stack->wakeup_list) {
+ struct wakeup_poll *wakeup = container_of((node - stack->queue_id), struct wakeup_poll, wakeup_list);
+
+ if (!get_protocol_stack_group()->wakeup_enable) {
+ if (__atomic_load_n(&wakeup->in_wait, __ATOMIC_ACQUIRE)) {
+ __atomic_store_n(&wakeup->in_wait, false, __ATOMIC_RELEASE);
+ rte_mb();
+ pthread_mutex_unlock(&wakeup->wait);
+ stack->stats.wakeup_events++;
+ }
+ } else {
+ gazelle_light_ring_enqueue_busrt(stack->wakeup_ring, (void **)&wakeup, 1);
+ stack->stats.wakeup_events++;
+ }
+
+ list_del_node_null(&wakeup->wakeup_list[stack->queue_id]);
}
}
@@ -101,7 +119,11 @@ static void raise_pending_events(struct wakeup_poll *wakeup, struct lwip_sock *s
{
sock->events = update_events(sock);
if (sock->events) {
- add_sock_event(sock, sock->events);
+ pthread_spin_lock(&wakeup->event_list_lock);
+ if (wakeup->type == WAKEUP_EPOLL && list_is_null(&sock->event_list)) {
+ list_add_node(&wakeup->event_list, &sock->event_list);
+ }
+ pthread_spin_unlock(&wakeup->event_list_lock);
}
}
@@ -125,23 +147,23 @@ int32_t lstack_do_epoll_create(int32_t fd)
GAZELLE_RETURN(EINVAL);
}
- wakeup->eventfd = eventfd(0, EFD_NONBLOCK);
- if (wakeup->eventfd < 0) {
- LSTACK_LOG(ERR, LSTACK, "eventfd fail=%d errno=%d\n", wakeup->eventfd, errno);
- posix_api->close_fn(fd);
- free(wakeup);
- GAZELLE_RETURN(EINVAL);
+ for (uint32_t i = 0; i < PROTOCOL_STACK_MAX; i++) {
+ init_list_node_null(&wakeup->wakeup_list[i]);
}
- struct epoll_event event;
- event.data.fd = wakeup->eventfd;
- event.events = EPOLLIN | EPOLLET;
- if (posix_api->epoll_ctl_fn(fd, EPOLL_CTL_ADD, wakeup->eventfd, &event) < 0) {
- LSTACK_LOG(ERR, LSTACK, "ctl eventfd errno=%d\n", errno);
+ if (pthread_mutex_init(&wakeup->wait, NULL) != 0) {
posix_api->close_fn(fd);
free(wakeup);
GAZELLE_RETURN(EINVAL);
}
+ pthread_mutex_trylock(&wakeup->wait);
+ __atomic_store_n(&wakeup->in_wait, true, __ATOMIC_RELEASE);
+
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ init_list_node_null(&wakeup->poll_list);
+ pthread_spin_lock(&stack_group->poll_list_lock);
+ list_add_node(&stack_group->poll_list, &wakeup->poll_list);
+ pthread_spin_unlock(&stack_group->poll_list_lock);
init_list_node(&wakeup->event_list);
pthread_spin_init(&wakeup->event_list_lock, PTHREAD_PROCESS_PRIVATE);
@@ -150,6 +172,9 @@ int32_t lstack_do_epoll_create(int32_t fd)
wakeup->epollfd = fd;
sock->wakeup = wakeup;
+ update_epoll_max_stack(wakeup);
+ change_epollfd_kernel_thread(wakeup, wakeup->bind_stack, wakeup->max_stack);
+
return fd;
}
@@ -167,24 +192,41 @@ int32_t lstack_epoll_create(int32_t flags)
int32_t lstack_epoll_close(int32_t fd)
{
- posix_api->close_fn(fd);
-
struct lwip_sock *sock = get_socket_by_fd(fd);
if (sock == NULL) {
LSTACK_LOG(ERR, LSTACK, "fd=%d sock is NULL errno=%d\n", fd, errno);
GAZELLE_RETURN(EINVAL);
}
- if (sock->wakeup) {
- if (sock->wakeup->bind_stack) {
- unregister_wakeup(sock->wakeup->bind_stack, sock->wakeup);
- }
- posix_api->close_fn(sock->wakeup->eventfd);
- pthread_spin_destroy(&sock->wakeup->event_list_lock);
- free(sock->wakeup);
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ struct wakeup_poll *wakeup = sock->wakeup;
+ if (wakeup == NULL) {
+ return 0;
}
+
+ wakeup->type = WAKEUP_CLOSE;
+
+ stack_broadcast_clean_epoll(wakeup);
+
+ struct list_node *node, *temp;
+ pthread_spin_lock(&wakeup->event_list_lock);
+ list_for_each_safe(node, temp, &wakeup->event_list) {
+ struct lwip_sock *sock = container_of(node, struct lwip_sock, event_list);
+ list_del_node_null(&sock->event_list);
+ }
+ pthread_spin_unlock(&wakeup->event_list_lock);
+ pthread_spin_destroy(&wakeup->event_list_lock);
+
+ pthread_spin_lock(&stack_group->poll_list_lock);
+ list_del_node_null(&wakeup->poll_list);
+ pthread_spin_unlock(&stack_group->poll_list_lock);
+
+ pthread_mutex_destroy(&wakeup->wait);
+
+ free(wakeup);
sock->wakeup = NULL;
+ posix_api->close_fn(fd);
return 0;
}
@@ -243,12 +285,10 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
struct wakeup_poll *wakeup = epoll_sock->wakeup;
struct lwip_sock *sock = get_socket(fd);
if (sock == NULL) {
- wakeup->have_kernel_fd = true;
return posix_api->epoll_ctl_fn(epfd, op, fd, event);
}
if (CONN_TYPE_HAS_HOST(sock->conn)) {
- wakeup->have_kernel_fd = true;
int32_t ret = posix_api->epoll_ctl_fn(epfd, op, fd, event);
if (ret < 0) {
LSTACK_LOG(ERR, LSTACK, "fd=%d epfd=%d op=%d\n", fd, epfd, op);
@@ -347,36 +387,40 @@ static int32_t poll_lwip_event(struct pollfd *fds, nfds_t nfds)
return event_num;
}
-static void epoll_bind_statck(struct wakeup_poll *wakeup)
+static void change_epollfd_kernel_thread(struct wakeup_poll *wakeup, struct protocol_stack *old_stack,
+ struct protocol_stack *new_stack)
{
- /* all fd is kernel, set rand stack */
- if (wakeup->bind_stack == NULL && wakeup->max_stack == NULL) {
- update_epoll_max_stack(wakeup);
+ if (old_stack) {
+ if (posix_api->epoll_ctl_fn(old_stack->epollfd, EPOLL_CTL_DEL, wakeup->epollfd, NULL) != 0) {
+ LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno);
+ }
}
- if (wakeup->bind_stack != wakeup->max_stack && wakeup->max_stack) {
- if (get_global_cfg_params()->app_bind_numa) {
- bind_to_stack_numa(wakeup->max_stack);
- }
- if (wakeup->bind_stack) {
- unregister_wakeup(wakeup->bind_stack, wakeup);
- }
- wakeup->bind_stack = wakeup->max_stack;
- register_wakeup(wakeup->bind_stack, wakeup);
+ /* avoid kernel thread post too much, use EPOLLET */
+ struct epoll_event event;
+ event.data.ptr = wakeup;
+ event.events = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLET;
+ if (posix_api->epoll_ctl_fn(new_stack->epollfd, EPOLL_CTL_ADD, wakeup->epollfd, &event) != 0) {
+ LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno);
}
}
-static bool del_event_fd(struct epoll_event* events, int32_t eventnum, int32_t eventfd)
+static void epoll_bind_statck(struct wakeup_poll *wakeup)
{
- for (int32_t i = 0; i < eventnum; i++) {
- if (events[i].data.fd == eventfd) {
- events[i].data.u64 = events[eventnum - 1].data.u64;
- events[i].events = events[eventnum - 1].events;
- return true;
- }
+ if (wakeup->bind_stack != wakeup->max_stack && wakeup->max_stack) {
+ bind_to_stack_numa(wakeup->max_stack);
+ change_epollfd_kernel_thread(wakeup, wakeup->bind_stack, wakeup->max_stack);
+ wakeup->bind_stack = wakeup->max_stack;
}
+}
- return false;
+static void ms_to_timespec(struct timespec *timespec, int32_t timeout)
+{
+ clock_gettime(CLOCK_REALTIME, timespec);
+ timespec->tv_sec += timeout / SEC_TO_MSEC;
+ timespec->tv_nsec += (timeout % SEC_TO_MSEC) * MSEC_TO_NSEC;
+ timespec->tv_sec += timespec->tv_nsec / SEC_TO_NSEC;
+ timespec->tv_nsec = timespec->tv_nsec % SEC_TO_NSEC;
}
int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout)
@@ -388,63 +432,84 @@ int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxe
struct wakeup_poll *wakeup = sock->wakeup;
int32_t kernel_num = 0;
+ int32_t lwip_num = 0;
+ int32_t ret = 0;
- epoll_bind_statck(sock->wakeup);
+ if (get_global_cfg_params()->app_bind_numa) {
+ epoll_bind_statck(sock->wakeup);
+ }
- __atomic_store_n(&wakeup->in_wait, true, __ATOMIC_RELEASE);
- rte_mb();
+ do {
+ lwip_num = epoll_lwip_event(wakeup, events, maxevents);
+ wakeup->stat.app_events += lwip_num;
- int32_t lwip_num = epoll_lwip_event(wakeup, events, maxevents);
- wakeup->stat.app_events += lwip_num;
- if (!wakeup->have_kernel_fd && lwip_num > 0) {
- return lwip_num;
- }
+ if (__atomic_load_n(&wakeup->have_kernel_event, __ATOMIC_ACQUIRE)) {
+ __atomic_store_n(&wakeup->have_kernel_event, false, __ATOMIC_RELEASE);
+ kernel_num = posix_api->epoll_wait_fn(epfd, &events[lwip_num], maxevents - lwip_num, 0);
+ }
- if (lwip_num > 0) {
- __atomic_store_n(&wakeup->in_wait, false, __ATOMIC_RELEASE);
- rte_mb();
- kernel_num = posix_api->epoll_wait_fn(epfd, &events[lwip_num], maxevents - lwip_num, 0);
- } else {
- kernel_num = posix_api->epoll_wait_fn(epfd, &events[lwip_num], maxevents - lwip_num, timeout);
- rte_mb();
- __atomic_store_n(&wakeup->in_wait, false, __ATOMIC_RELEASE);
- }
+ if (lwip_num + kernel_num > 0) {
+ return lwip_num + kernel_num;
+ }
- if (kernel_num <= 0) {
- return (lwip_num > 0) ? lwip_num : kernel_num;
- }
+ if (timeout < 0) {
+ ret = pthread_mutex_lock(&wakeup->wait);
+ } else {
+ struct timespec epoll_time;
+ ms_to_timespec(&epoll_time, timeout);
+ ret = pthread_mutex_timedlock(&wakeup->wait, &epoll_time);
+ }
- if (del_event_fd(&events[lwip_num], kernel_num, wakeup->eventfd)) {
- kernel_num--;
- if (lwip_num == 0) {
- lwip_num = epoll_lwip_event(wakeup, &events[kernel_num], maxevents - kernel_num);
+ if (ret == 0) {
+ __atomic_store_n(&wakeup->in_wait, true, __ATOMIC_RELEASE);
}
- }
+ } while (ret == 0);
- return lwip_num + kernel_num;
+ return 0;
}
static int32_t init_poll_wakeup_data(struct wakeup_poll *wakeup)
{
- wakeup->type = WAKEUP_POLL;
+ if (pthread_mutex_init(&wakeup->wait, NULL) != 0) {
+ GAZELLE_RETURN(EINVAL);
+ }
+ pthread_mutex_trylock(&wakeup->wait);
+ __atomic_store_n(&wakeup->in_wait, true, __ATOMIC_RELEASE);
+
+ for (uint32_t i = 0; i < PROTOCOL_STACK_MAX; i++) {
+ init_list_node_null(&wakeup->wakeup_list[i]);
+ }
- wakeup->eventfd = eventfd(0, EFD_NONBLOCK);
- if (wakeup->eventfd < 0) {
- LSTACK_LOG(ERR, LSTACK, "eventfd failed errno=%d\n", errno);
+ wakeup->epollfd = posix_api->epoll_create_fn(POLL_KERNEL_EVENTS);
+ if (wakeup->epollfd < 0) {
GAZELLE_RETURN(EINVAL);
}
+ wakeup->type = WAKEUP_POLL;
+
wakeup->last_fds = calloc(POLL_KERNEL_EVENTS, sizeof(struct pollfd));
if (wakeup->last_fds == NULL) {
- LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
- posix_api->close_fn(wakeup->eventfd);
GAZELLE_RETURN(EINVAL);
}
-
- wakeup->last_fds[0].fd = wakeup->eventfd;
- wakeup->last_fds[0].events = POLLIN;
wakeup->last_max_nfds = POLL_KERNEL_EVENTS;
+ wakeup->events = calloc(POLL_KERNEL_EVENTS, sizeof(struct epoll_event));
+ if (wakeup->events == NULL) {
+ free(wakeup->last_fds);
+ GAZELLE_RETURN(EINVAL);
+ }
+
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ init_list_node_null(&wakeup->poll_list);
+ pthread_spin_lock(&stack_group->poll_list_lock);
+ list_add_node(&stack_group->poll_list, &wakeup->poll_list);
+ pthread_spin_unlock(&stack_group->poll_list_lock);
+
+ int32_t stack_count[PROTOCOL_STACK_MAX] = {0};
+ uint16_t bind_id = find_max_cnt_stack(stack_count, stack_group->stack_num, wakeup->bind_stack);
+ change_epollfd_kernel_thread(wakeup, wakeup->bind_stack, stack_group->stacks[bind_id]);
+ wakeup->bind_stack = stack_group->stacks[bind_id];
+
return 0;
}
@@ -453,13 +518,19 @@ static void resize_kernel_poll(struct wakeup_poll *wakeup, nfds_t nfds)
if (wakeup->last_fds) {
free(wakeup->last_fds);
}
- wakeup->last_fds = calloc(nfds + 1, sizeof(struct pollfd));
+ wakeup->last_fds = calloc(nfds, sizeof(struct pollfd));
if (wakeup->last_fds == NULL) {
LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
}
- wakeup->last_fds[0].fd = wakeup->eventfd;
- wakeup->last_fds[0].events = POLLIN;
+ if (wakeup->events) {
+ free(wakeup->events);
+ }
+ wakeup->events = calloc(nfds, sizeof(struct epoll_event));
+ if (wakeup->events == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
+ }
+
wakeup->last_max_nfds = nfds;
}
@@ -472,15 +543,25 @@ static void poll_bind_statck(struct wakeup_poll *wakeup, int32_t *stack_count)
return;
}
- if (wakeup->bind_stack) {
- unregister_wakeup(wakeup->bind_stack, wakeup);
+ change_epollfd_kernel_thread(wakeup, wakeup->bind_stack, stack_group->stacks[bind_id]);
+ bind_to_stack_numa(stack_group->stacks[bind_id]);
+ wakeup->bind_stack = stack_group->stacks[bind_id];
+}
+
+static void update_kernel_poll(struct wakeup_poll *wakeup, uint32_t index, struct pollfd *new_fd)
+{
+ posix_api->epoll_ctl_fn(wakeup->epollfd, EPOLL_CTL_DEL, wakeup->last_fds[index].fd, NULL);
+
+ if (new_fd == NULL) {
+ return;
}
-
- if (get_global_cfg_params()->app_bind_numa) {
- bind_to_stack_numa(stack_group->stacks[bind_id]);
+
+ struct epoll_event event;
+ event.data.u32 = index;
+ event.events = new_fd->events;
+ if (posix_api->epoll_ctl_fn(wakeup->epollfd, EPOLL_CTL_ADD, new_fd->fd, &event) != 0) {
+ LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno);
}
- wakeup->bind_stack = stack_group->stacks[bind_id];
- register_wakeup(wakeup->bind_stack, wakeup);
}
static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfds)
@@ -494,31 +575,33 @@ static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfd
poll_change = 1;
}
+ if (nfds < wakeup->last_nfds) {
+ poll_change = 1;
+ }
+
for (uint32_t i = 0; i < nfds; i++) {
int32_t fd = fds[i].fd;
fds[i].revents = 0;
struct lwip_sock *sock = get_socket_by_fd(fd);
- if (fd == wakeup->last_fds[i + 1].fd && fds[i].events == wakeup->last_fds[i + 1].events) {
+ if (fd == wakeup->last_fds[i].fd && fds[i].events == wakeup->last_fds[i].events) {
/* fd close then socket may get same fd. */
if (sock == NULL || sock->wakeup != NULL) {
continue;
}
}
- wakeup->last_fds[i + 1].fd = fd;
- wakeup->last_fds[i + 1].events = fds[i].events;
+ wakeup->last_fds[i].fd = fd;
+ wakeup->last_fds[i].events = fds[i].events;
poll_change = 1;
- while (sock && sock->conn) {
- if (sock->epoll_events != (fds[i].events | POLLERR)) {
- sock->epoll_events = fds[i].events | POLLERR;
- }
- if (sock->wakeup != wakeup) {
- sock->wakeup = wakeup;
- }
+ if (sock == NULL || sock->conn == NULL || CONN_TYPE_HAS_HOST(sock->conn)) {
+ update_kernel_poll(wakeup, i, fds + i);
+ }
+ while (sock && sock->conn) {
+ sock->epoll_events = fds[i].events | POLLERR;
+ sock->wakeup = wakeup;
stack_count[sock->stack->queue_id]++;
- /* listenfd list */
sock = sock->listen_next;
}
}
@@ -526,9 +609,11 @@ static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfd
if (poll_change == 0) {
return;
}
- wakeup->last_nfds = nfds + 1;
+ wakeup->last_nfds = nfds;
- poll_bind_statck(wakeup, stack_count);
+ if (get_global_cfg_params()->app_bind_numa) {
+ poll_bind_statck(wakeup, stack_count);
+ }
}
int32_t lstack_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout)
@@ -548,43 +633,38 @@ int32_t lstack_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout)
poll_init(wakeup, fds, nfds);
- __atomic_store_n(&wakeup->in_wait, true, __ATOMIC_RELEASE);
- rte_mb();
-
- int32_t lwip_num = poll_lwip_event(fds, nfds);
- wakeup->stat.app_events += lwip_num;
- if (lwip_num >= nfds) {
- __atomic_store_n(&wakeup->in_wait, false, __ATOMIC_RELEASE);
- return lwip_num;
- }
-
int32_t kernel_num = 0;
- if (lwip_num > 0) {
- __atomic_store_n(&wakeup->in_wait, false, __ATOMIC_RELEASE);
- rte_mb();
- kernel_num = posix_api->poll_fn(wakeup->last_fds, wakeup->last_nfds, 0);
- } else {
- kernel_num = posix_api->poll_fn(wakeup->last_fds, wakeup->last_nfds, timeout);
- rte_mb();
- __atomic_store_n(&wakeup->in_wait, false, __ATOMIC_RELEASE);
- }
+ int32_t lwip_num = 0;
+ int32_t ret;
- if (kernel_num <= 0) {
- return (lwip_num > 0) ? lwip_num : kernel_num;
- }
+ do {
+ lwip_num = poll_lwip_event(fds, nfds);
+
+ if (__atomic_load_n(&wakeup->have_kernel_event, __ATOMIC_ACQUIRE)) {
+ __atomic_store_n(&wakeup->have_kernel_event, false, __ATOMIC_RELEASE);
+ kernel_num = posix_api->epoll_wait_fn(wakeup->epollfd, wakeup->events, nfds, 0);
+ for (int32_t i = 0; i < kernel_num; i++) {
+ uint32_t index = wakeup->events[i].data.u32;
+ fds[index].revents = wakeup->events[i].events;
+ }
+ }
- for (nfds_t i = 0; i < nfds; i++) {
- if (fds[i].revents == 0 && wakeup->last_fds[i + 1].revents != 0) {
- fds[i].revents = wakeup->last_fds[i + 1].revents;
+ if (lwip_num + kernel_num > 0) {
+ return lwip_num + kernel_num;
}
- }
- if (wakeup->last_fds[0].revents) {
- if (lwip_num == 0) {
- lwip_num = poll_lwip_event(fds, nfds);
+ if (timeout < 0) {
+ ret = pthread_mutex_lock(&wakeup->wait);
+ } else {
+ struct timespec epoll_time;
+ ms_to_timespec(&epoll_time, timeout);
+ ret = pthread_mutex_timedlock(&wakeup->wait, &epoll_time);
}
- kernel_num--;
- }
- return kernel_num + lwip_num;
+ if (ret == 0) {
+ __atomic_store_n(&wakeup->in_wait, true, __ATOMIC_RELEASE);
+ }
+ } while (ret == 0);
+
+ return 0;
}
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
index 4669a30..8dc4524 100644
--- a/src/lstack/api/lstack_wrap.c
+++ b/src/lstack/api/lstack_wrap.c
@@ -203,6 +203,15 @@ static int32_t do_connect(int32_t s, const struct sockaddr *name, socklen_t name
return posix_api->connect_fn(s, name, namelen);
}
+ struct lwip_sock *sock = get_socket(s);
+ if (sock == NULL) {
+ return posix_api->connect_fn(s, name, namelen);
+ }
+
+ if (!netconn_is_nonblocking(sock->conn)) {
+ GAZELLE_RETURN(EINVAL);
+ }
+
int32_t ret = rpc_call_connect(s, name, namelen);
if (ret == 0 || errno == EISCONN) {
return ret;
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 94f95fa..00afc75 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -234,13 +234,13 @@ struct pbuf *write_lwip_data(struct lwip_sock *sock, uint16_t remain_size, uint8
return NULL;
}
- sock->stack->stats.write_lwip_cnt++;
return pbuf;
}
void write_lwip_over(struct lwip_sock *sock, uint32_t n)
{
gazelle_ring_dequeue_over(sock->send_ring, n);
+ sock->stack->stats.write_lwip_cnt += n;
}
static inline void del_data_out_event(struct lwip_sock *sock)
@@ -269,6 +269,7 @@ void write_stack_over(struct lwip_sock *sock)
gazelle_ring_read_over(sock->send_ring);
if (sock->wakeup) {
+ sock->wakeup->stat.app_write_cnt++;
if (sock->wakeup->type == WAKEUP_EPOLL && (sock->events & EPOLLOUT)) {
del_data_out_event(sock);
}
@@ -882,6 +883,7 @@ void get_lwip_conntable(struct rpc_msg *msg)
conn[conn_num].l_port = pcbl->local_port;
conn[conn_num].tcp_sub_state = pcbl->state;
struct netconn *netconn = (struct netconn *)pcbl->callback_arg;
+ conn[conn_num].fd = netconn->socket;
if (netconn != NULL && netconn->acceptmbox != NULL) {
conn[conn_num].recv_cnt = rte_ring_count(netconn->acceptmbox->ring);
}
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index c381187..79769f3 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -36,12 +36,12 @@
#include "posix/lstack_epoll.h"
#include "lstack_stack_stat.h"
-#define READ_LIST_MAX 32
-#define SEND_LIST_MAX 32
-#define HANDLE_RPC_MSG_MAX 32
-#define KERNEL_EPOLL_MAX 256
+#define READ_LIST_MAX 128
+#define SEND_LIST_MAX 128
+#define HANDLE_RPC_MSG_MAX 128
+#define KERNEL_EVENT_100us 100
-static PER_THREAD uint16_t g_stack_idx = PROTOCOL_STACK_MAX;
+static PER_THREAD struct protocol_stack *g_stack_p = NULL;
static struct protocol_stack_group g_stack_group = {0};
void set_init_fail(void);
@@ -57,12 +57,13 @@ void bind_to_stack_numa(struct protocol_stack *stack)
ret = pthread_setaffinity_np(tid, sizeof(stack->idle_cpuset), &stack->idle_cpuset);
if (ret != 0) {
LSTACK_LOG(ERR, LSTACK, "thread %d setaffinity to stack %hu failed\n", rte_gettid(), stack->queue_id);
+ return;
}
}
static inline void set_stack_idx(uint16_t idx)
{
- g_stack_idx = idx;
+ g_stack_p = g_stack_group.stacks[idx];
}
long get_stack_tid(void)
@@ -83,10 +84,7 @@ struct protocol_stack_group *get_protocol_stack_group(void)
struct protocol_stack *get_protocol_stack(void)
{
- if (g_stack_idx >= PROTOCOL_STACK_MAX) {
- return NULL;
- }
- return g_stack_group.stacks[g_stack_idx];
+ return g_stack_p;
}
struct protocol_stack *get_protocol_stack_by_fd(int32_t fd)
@@ -241,10 +239,33 @@ static void* gazelle_wakeup_thread(void *arg)
nanosleep(&st, NULL);
}
- sem_t *event_sem[WAKEUP_MAX_NUM];
- uint32_t num = gazelle_light_ring_dequeue_burst(stack->wakeup_ring, (void **)event_sem, WAKEUP_MAX_NUM);
+ struct wakeup_poll *wakeup[WAKEUP_MAX_NUM];
+ uint32_t num = gazelle_light_ring_dequeue_burst(stack->wakeup_ring, (void **)wakeup, WAKEUP_MAX_NUM);
for (uint32_t i = 0; i < num; i++) {
- sem_post(event_sem[i]);
+ if (__atomic_load_n(&wakeup[i]->in_wait, __ATOMIC_ACQUIRE)) {
+ __atomic_store_n(&wakeup[i]->in_wait, false, __ATOMIC_RELEASE);
+ rte_mb();
+ pthread_mutex_unlock(&wakeup[i]->wait);
+ }
+ }
+ }
+
+ return NULL;
+}
+
+static void* gazelle_kernelevent_thread(void *arg)
+{
+ uint16_t queue_id = *(uint16_t *)arg;
+ struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id];
+
+ bind_to_stack_numa(stack);
+
+ LSTACK_LOG(INFO, LSTACK, "kernelevent_%02hu start\n", queue_id);
+
+ for (;;) {
+ stack->kernel_event_num = posix_api->epoll_wait_fn(stack->epollfd, stack->kernel_events, KERNEL_EPOLL_MAX, -1);
+ while (stack->kernel_event_num > 0) {
+ usleep(KERNEL_EVENT_100us);
}
}
@@ -255,27 +276,26 @@ static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id)
{
struct protocol_stack_group *stack_group = get_protocol_stack_group();
- set_stack_idx(queue_id);
stack->tid = rte_gettid();
stack->queue_id = queue_id;
stack->port_id = stack_group->port_id;
stack->cpu_id = get_global_cfg_params()->cpus[queue_id];
stack->lwip_stats = &lwip_stats;
- pthread_spin_init(&stack->wakeup_list_lock, PTHREAD_PROCESS_PRIVATE);
-
init_list_node(&stack->recv_list);
init_list_node(&stack->send_list);
+ init_list_node(&stack->wakeup_list);
sys_calibrate_tsc();
stack_stat_init();
stack_group->stacks[queue_id] = stack;
+ set_stack_idx(queue_id);
- if (thread_affinity_init(stack->cpu_id) != 0) {
+ stack->epollfd = posix_api->epoll_create_fn(GAZELLE_LSTACK_MAX_CONN);
+ if (stack->epollfd < 0) {
return -1;
}
- RTE_PER_LCORE(_lcore_id) = stack->cpu_id;
stack->socket_id = numa_node_of_cpu(stack->cpu_id);
if (stack->socket_id < 0) {
@@ -302,6 +322,23 @@ void wait_sem_value(sem_t *sem, int32_t wait_value)
} while (sem_val < wait_value);
}
+static int32_t create_affiliate_thread(uint16_t queue_id, bool wakeup_enable)
+{
+ if (wakeup_enable) {
+ if (create_thread(queue_id, "gazelleweakup", gazelle_wakeup_thread) != 0) {
+ LSTACK_LOG(ERR, LSTACK, "gazelleweakup errno=%d\n", errno);
+ return -1;
+ }
+ }
+
+ if (create_thread(queue_id, "gazellekernel", gazelle_kernelevent_thread) != 0) {
+ LSTACK_LOG(ERR, LSTACK, "gazellekernel errno=%d\n", errno);
+ return -1;
+ }
+
+ return 0;
+}
+
static struct protocol_stack *stack_thread_init(uint16_t queue_id)
{
struct protocol_stack_group *stack_group = get_protocol_stack_group();
@@ -319,6 +356,19 @@ static struct protocol_stack *stack_thread_init(uint16_t queue_id)
return NULL;
}
+ if (create_affiliate_thread(queue_id, stack_group->wakeup_enable) < 0) {
+ sem_post(&stack_group->thread_phase1);
+ free(stack);
+ return NULL;
+ }
+
+ if (thread_affinity_init(stack->cpu_id) != 0) {
+ sem_post(&stack_group->thread_phase1);
+ free(stack);
+ return NULL;
+ }
+ RTE_PER_LCORE(_lcore_id) = stack->cpu_id;
+
hugepage_init();
tcpip_init(NULL, NULL);
@@ -342,35 +392,28 @@ static struct protocol_stack *stack_thread_init(uint16_t queue_id)
return NULL;
}
- if (stack_group->wakeup_enable) {
- if (create_thread(stack->queue_id, "gazelleweakup", gazelle_wakeup_thread) != 0) {
- LSTACK_LOG(ERR, LSTACK, "gazelleweakup errno=%d\n", errno);
- free(stack);
- return NULL;
- }
- }
-
return stack;
}
-static void wakeup_stack_wait(struct protocol_stack *stack)
+static void wakeup_kernel_event(struct protocol_stack *stack)
{
- if (!stack->have_event || pthread_spin_trylock(&stack->wakeup_list_lock)) {
+ if (stack->kernel_event_num == 0) {
return;
}
- struct wakeup_poll *node = stack->wakeup_list;
- while (node) {
- if (node->have_event) {
- wakeup_epoll(stack, node);
- node->have_event = false;
+ for (int32_t i = 0; i < stack->kernel_event_num; i++) {
+ struct wakeup_poll *wakeup = stack->kernel_events[i].data.ptr;
+ if (wakeup->type == WAKEUP_CLOSE) {
+ continue;
}
- node = node->next;
- }
- pthread_spin_unlock(&stack->wakeup_list_lock);
+ __atomic_store_n(&wakeup->have_kernel_event, true, __ATOMIC_RELEASE);
+ if (list_is_null(&wakeup->wakeup_list[stack->queue_id])) {
+ list_add_node(&stack->wakeup_list, &wakeup->wakeup_list[stack->queue_id]);
+ }
+ }
- stack->have_event = false;
+ stack->kernel_event_num = 0;
}
static void* gazelle_stack_thread(void *arg)
@@ -398,7 +441,9 @@ static void* gazelle_stack_thread(void *arg)
send_stack_list(stack, SEND_LIST_MAX);
- wakeup_stack_wait(stack);
+ wakeup_kernel_event(stack);
+
+ wakeup_stack_epoll(stack);
sys_timer_run();
@@ -445,6 +490,9 @@ int32_t init_protocol_stack(void)
stack_group->stack_num = get_global_cfg_params()->num_cpu;
stack_group->wakeup_enable = (get_global_cfg_params()->num_wakeup > 0) ? true : false;
+ init_list_node(&stack_group->poll_list);
+ pthread_spin_init(&stack_group->poll_list_lock, PTHREAD_PROCESS_PRIVATE);
+
if (init_protocol_sem() != 0) {
return -1;
@@ -482,7 +530,10 @@ void stack_socket(struct rpc_msg *msg)
{
msg->result = gazelle_socket(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i);
if (msg->result < 0) {
- LSTACK_LOG(ERR, LSTACK, "tid %ld, %ld socket failed\n", get_stack_tid(), msg->result);
+ msg->result = gazelle_socket(msg->args[MSG_ARG_0].i, msg->args[MSG_ARG_1].i, msg->args[MSG_ARG_2].i);
+ if (msg->result < 0) {
+ LSTACK_LOG(ERR, LSTACK, "tid %ld, %ld socket failed\n", get_stack_tid(), msg->result);
+ }
}
}
@@ -644,6 +695,25 @@ void stack_broadcast_arp(struct rte_mbuf *mbuf, struct protocol_stack *cur_stack
}
}
+void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup)
+{
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ struct protocol_stack *stack = NULL;
+
+ for (int32_t i = 0; i < stack_group->stack_num; i++) {
+ stack = stack_group->stacks[i];
+ rpc_call_clean_epoll(stack, wakeup);
+ }
+}
+
+void stack_clean_epoll(struct rpc_msg *msg)
+{
+ struct protocol_stack *stack = get_protocol_stack();
+ struct wakeup_poll *wakeup = (struct wakeup_poll *)msg->args[MSG_ARG_0].p;
+
+ list_del_node_null(&wakeup->wakeup_list[stack->queue_id]);
+}
+
/* when fd is listenfd, listenfd of all protocol stack thread will be closed */
int32_t stack_broadcast_close(int32_t fd)
{
diff --git a/src/lstack/core/lstack_stack_stat.c b/src/lstack/core/lstack_stack_stat.c
index c011aed..6261fa9 100644
--- a/src/lstack/core/lstack_stack_stat.c
+++ b/src/lstack/core/lstack_stack_stat.c
@@ -92,59 +92,26 @@ static void set_latency_start_flag(bool start)
}
}
-void register_wakeup(struct protocol_stack *stack, struct wakeup_poll *wakeup)
+static void get_wakeup_stat(struct protocol_stack_group *stack_group, struct protocol_stack *stack,
+ struct gazelle_wakeup_stat *stat)
{
- pthread_spin_lock(&stack->wakeup_list_lock);
+ struct list_node *node, *temp;
- wakeup->next = stack->wakeup_list;
- stack->wakeup_list = wakeup;
+ pthread_spin_lock(&stack_group->poll_list_lock);
- pthread_spin_unlock(&stack->wakeup_list_lock);
-}
-
-void unregister_wakeup(struct protocol_stack *stack, struct wakeup_poll *wakeup)
-{
- pthread_spin_lock(&stack->wakeup_list_lock);
-
- struct wakeup_poll *node = stack->wakeup_list;
- struct wakeup_poll *pre = NULL;
-
- while (node && node != wakeup) {
- pre = node;
- node = node->next;
- }
-
- if (node == NULL) {
- pthread_spin_unlock(&stack->wakeup_list_lock);
- return;
- }
+ list_for_each_safe(node, temp, &stack_group->poll_list) {
+ struct wakeup_poll *wakeup = container_of(node, struct wakeup_poll, poll_list);
- if (pre) {
- pre->next = node->next;
- } else {
- stack->wakeup_list = node->next;
- }
- node->next = NULL;
-
- pthread_spin_unlock(&stack->wakeup_list_lock);
-}
-
-static void get_wakeup_stat(struct protocol_stack *stack, struct gazelle_wakeup_stat *stat)
-{
- pthread_spin_lock(&stack->wakeup_list_lock);
-
- struct wakeup_poll *node = stack->wakeup_list;
- while (node) {
- stat->app_events += node->stat.app_events;
- stat->read_null += node->stat.read_null;
- stat->app_write_cnt += node->stat.app_write_cnt;
- stat->app_write_idlefail += node->stat.app_write_idlefail;
- stat->app_read_cnt += node->stat.app_read_cnt;
-
- node = node->next;
+ if (wakeup->bind_stack == stack) {
+ stat->app_events += wakeup->stat.app_events;
+ stat->read_null += wakeup->stat.read_null;
+ stat->app_write_cnt += wakeup->stat.app_write_cnt;
+ stat->app_write_idlefail += wakeup->stat.app_write_idlefail;
+ stat->app_read_cnt += wakeup->stat.app_read_cnt;
+ }
}
- pthread_spin_unlock(&stack->wakeup_list_lock);
+ pthread_spin_unlock(&stack_group->poll_list_lock);
}
void lstack_get_low_power_info(struct gazelle_stat_low_power_info *low_power_info)
@@ -172,7 +139,7 @@ static void get_stack_stats(struct gazelle_stack_dfx_data *dfx, struct protocol_
return;
}
- get_wakeup_stat(stack, &dfx->data.pkts.wakeup_stat);
+ get_wakeup_stat(stack_group, stack, &dfx->data.pkts.wakeup_stat);
dfx->data.pkts.call_alloc_fail = stack_group->call_alloc_fail;
@@ -251,11 +218,13 @@ static int32_t send_control_cmd_data(int32_t fd, struct gazelle_stack_dfx_data *
int32_t handle_stack_cmd(int32_t fd, enum GAZELLE_STAT_MODE stat_mode)
{
- struct gazelle_stack_dfx_data dfx = {0};
+ struct gazelle_stack_dfx_data dfx;
struct protocol_stack_group *stack_group = get_protocol_stack_group();
for (uint32_t i = 0; i < stack_group->stack_num; i++) {
struct protocol_stack *stack = stack_group->stacks[i];
+
+ memset_s(&dfx, sizeof(dfx), 0, sizeof(dfx));
get_stack_dfx_data(&dfx, stack, stat_mode);
if (!use_ltran() &&
diff --git a/src/lstack/core/lstack_thread_rpc.c b/src/lstack/core/lstack_thread_rpc.c
index bc77909..46cbbe7 100644
--- a/src/lstack/core/lstack_thread_rpc.c
+++ b/src/lstack/core/lstack_thread_rpc.c
@@ -88,7 +88,6 @@ static inline __attribute__((always_inline)) void rpc_msg_free(struct rpc_msg *m
static inline __attribute__((always_inline)) void rpc_call(lockless_queue *queue, struct rpc_msg *msg)
{
- pthread_spin_trylock(&msg->lock);
lockless_queue_mpsc_push(queue, &msg->queue_node);
}
@@ -96,6 +95,7 @@ static inline __attribute__((always_inline)) int32_t rpc_sync_call(lockless_queu
{
int32_t ret;
+ pthread_spin_trylock(&msg->lock);
rpc_call(queue, msg);
// waiting stack unlock
@@ -270,6 +270,18 @@ int32_t rpc_call_close(int fd)
return rpc_sync_call(&stack->rpc_queue, msg);
}
+void rpc_call_clean_epoll(struct protocol_stack *stack, struct wakeup_poll *wakeup)
+{
+ struct rpc_msg *msg = rpc_msg_alloc(stack, stack_clean_epoll);
+ if (msg == NULL) {
+ return;
+ }
+
+ msg->args[MSG_ARG_0].p = wakeup;
+
+ rpc_sync_call(&stack->rpc_queue, msg);
+}
+
int32_t rpc_call_bind(int32_t fd, const struct sockaddr *addr, socklen_t addrlen)
{
struct protocol_stack *stack = get_protocol_stack_by_fd(fd);
@@ -447,4 +459,3 @@ int32_t rpc_call_send(int fd, const void *buf, size_t len, int flags)
return 0;
}
-
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index 0eda45d..cc2cfb9 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -14,6 +14,7 @@
#define __GAZELLE_PROTOCOL_STACK_H__
#include <semaphore.h>
+#include <sys/epoll.h>
#include <lwip/list.h>
#include <lwip/netif.h>
@@ -50,11 +51,14 @@ struct protocol_stack {
struct reg_ring_msg *reg_buf;
volatile bool low_power;
- struct wakeup_poll *wakeup_list;
- pthread_spinlock_t wakeup_list_lock;
lockless_queue rpc_queue __rte_cache_aligned;
char pad __rte_cache_aligned;
+ /* kernel event thread read/write frequently */
+ struct epoll_event kernel_events[KERNEL_EPOLL_MAX];
+ int32_t kernel_event_num;
+ char pad1 __rte_cache_aligned;
+
struct netif netif;
struct eth_dev_ops *dev_ops;
uint32_t rx_ring_used;
@@ -62,7 +66,7 @@ struct protocol_stack {
struct list_node recv_list;
struct list_node send_list;
- bool have_event;
+ struct list_node wakeup_list;
volatile uint16_t conn_num;
struct stats_ *lwip_stats;
@@ -85,6 +89,8 @@ struct protocol_stack_group {
struct eth_params *eth_params;
struct protocol_stack *stacks[PROTOCOL_STACK_MAX];
bool wakeup_enable;
+ struct list_node poll_list;
+ pthread_spinlock_t poll_list_lock;
/* dfx stats */
bool latency_start;
@@ -117,7 +123,11 @@ int32_t stack_single_listen(int32_t fd, int32_t backlog);
int32_t stack_broadcast_accept(int32_t fd, struct sockaddr *addr, socklen_t *addrlen);
int32_t stack_broadcast_accept4(int32_t fd, struct sockaddr *addr, socklen_t *addrlen, int32_t flags);
+struct wakeup_poll;
+void stack_broadcast_clean_epoll(struct wakeup_poll *wakeup);
+
struct rpc_msg;
+void stack_clean_epoll(struct rpc_msg *msg);
void stack_arp(struct rpc_msg *msg);
void stack_socket(struct rpc_msg *msg);
void stack_close(struct rpc_msg *msg);
diff --git a/src/lstack/include/lstack_stack_stat.h b/src/lstack/include/lstack_stack_stat.h
index 98ffe8f..6057fe1 100644
--- a/src/lstack/include/lstack_stack_stat.h
+++ b/src/lstack/include/lstack_stack_stat.h
@@ -27,7 +27,6 @@ void stack_stat_init(void);
int32_t handle_stack_cmd(int fd, enum GAZELLE_STAT_MODE stat_mode);
uint64_t get_current_time(void);
void lstack_get_low_power_info(struct gazelle_stat_low_power_info *low_power_info);
-void register_wakeup(struct protocol_stack *stack, struct wakeup_poll *wakeup);
void unregister_wakeup(struct protocol_stack *stack, struct wakeup_poll *wakeup);
#endif /* GAZELLE_STACK_STAT_H */
diff --git a/src/lstack/include/lstack_thread_rpc.h b/src/lstack/include/lstack_thread_rpc.h
index f95bc72..6928f98 100644
--- a/src/lstack/include/lstack_thread_rpc.h
+++ b/src/lstack/include/lstack_thread_rpc.h
@@ -50,7 +50,9 @@ struct rpc_msg {
struct protocol_stack;
struct rte_mbuf;
+struct wakeup_poll;
void poll_rpc_msg(struct protocol_stack *stack, uint32_t max_num);
+void rpc_call_clean_epoll(struct protocol_stack *stack, struct wakeup_poll *wakeup);
int32_t rpc_call_msgcnt(struct protocol_stack *stack);
int32_t rpc_call_shadow_fd(struct protocol_stack *stack, int32_t fd, const struct sockaddr *addr, socklen_t addrlen);
int32_t rpc_call_recvlistcnt(struct protocol_stack *stack);
diff --git a/src/lstack/include/posix/lstack_epoll.h b/src/lstack/include/posix/lstack_epoll.h
index 3c8fd1b..d6c81a7 100644
--- a/src/lstack/include/posix/lstack_epoll.h
+++ b/src/lstack/include/posix/lstack_epoll.h
@@ -17,7 +17,6 @@
#include <stdbool.h>
#include <semaphore.h>
#include <pthread.h>
-#include <sys/eventfd.h>
#include <lwip/list.h>
@@ -31,29 +30,31 @@ extern "C" {
enum wakeup_type {
WAKEUP_EPOLL = 0,
WAKEUP_POLL,
+ WAKEUP_CLOSE,
};
struct protocol_stack;
struct wakeup_poll {
/* stack thread read frequently */
- int32_t eventfd;
enum wakeup_type type;
- bool have_event;
- volatile bool in_wait __rte_cache_aligned;
+ pthread_mutex_t wait __rte_cache_aligned;
+ bool in_wait;
+ struct list_node wakeup_list[PROTOCOL_STACK_MAX];
+ bool have_kernel_event;
char pad __rte_cache_aligned;
struct gazelle_wakeup_stat stat;
struct protocol_stack *bind_stack;
- struct wakeup_poll *next;
+ struct list_node poll_list;
/* poll */
struct pollfd *last_fds;
nfds_t last_nfds;
nfds_t last_max_nfds;
+ struct epoll_event *events;
/* epoll */
int32_t epollfd; /* epoll kernel fd */
- bool have_kernel_fd;
int32_t stack_fd_cnt[PROTOCOL_STACK_MAX];
struct protocol_stack *max_stack;
struct list_node event_list;
@@ -63,7 +64,7 @@ struct wakeup_poll {
struct netconn;
struct lwip_sock;
void add_sock_event(struct lwip_sock *sock, uint32_t event);
-void wakeup_epoll(struct protocol_stack *stack, struct wakeup_poll *wakeup);
+void wakeup_stack_epoll(struct protocol_stack *stack);
int32_t lstack_epoll_create(int32_t size);
int32_t lstack_epoll_create1(int32_t flags);
int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_event *event);
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 28bf32d..4757d72 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -79,7 +79,7 @@ void eth_dev_recv(struct rte_mbuf *mbuf)
}
}
-#define READ_PKTS_MAX 32
+#define READ_PKTS_MAX 128
int32_t eth_dev_poll(void)
{
uint32_t nr_pkts;
diff --git a/src/ltran/ltran_dfx.c b/src/ltran/ltran_dfx.c
index 3d977b5..de97a48 100644
--- a/src/ltran/ltran_dfx.c
+++ b/src/ltran/ltran_dfx.c
@@ -875,8 +875,8 @@ static void gazelle_print_lstack_stat_conn(void *buf, const struct gazelle_stat_
inet_ntop(AF_INET, &rip, str_rip, sizeof(str_rip)), conn_info->r_port,
tcp_state_to_str(conn_info->tcp_sub_state));
} else if (conn_info->state == GAZELLE_LISTEN_LIST) {
- printf("%-6utcp %-57u%s:%hu 0.0.0.0:* LISTEN\n", i, conn_info->recv_cnt,
- inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port);
+ printf("%-6utcp %-50u%-7d%s:%hu 0.0.0.0:* LISTEN\n", i, conn_info->recv_cnt,
+ conn_info->fd, inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port);
} else {
printf("Got unknow tcp conn::%s:%5hu, state:%u\n",
inet_ntop(AF_INET, &lip, str_ip, sizeof(str_ip)), conn_info->l_port, conn_info->state);
--
2.23.0
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/misaka00251/gazelle.git
git@gitee.com:misaka00251/gazelle.git
misaka00251
gazelle
gazelle
master

搜索帮助