1 Star 0 Fork 32

吴昌盛/gazelle-tar

forked from src-openEuler/gazelle 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
0048-refactor-kernel-event-poll-epoll.patch 28.43 KB
一键复制 编辑 原始数据 按行查看 历史
jinag12 提交于 2022-07-07 21:58 . backport upstream patches:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836
From a74d5b38b2021397d13b13aaa30f41f69be6f475 Mon Sep 17 00:00:00 2001
From: wuchangsheng <wuchangsheng2@huawei.com>
Date: Thu, 21 Apr 2022 17:21:59 +0800
Subject: [PATCH 09/18] refactor kernel event poll/epoll
---
src/lstack/api/lstack_epoll.c | 343 +++++++++++++++------
src/lstack/api/lstack_wrap.c | 21 +-
src/lstack/core/lstack_dpdk.c | 4 +-
src/lstack/core/lstack_init.c | 2 +-
src/lstack/core/lstack_lwip.c | 1 +
src/lstack/core/lstack_protocol_stack.c | 85 ++++-
src/lstack/include/lstack_cfg.h | 1 -
src/lstack/include/lstack_protocol_stack.h | 8 +-
src/lstack/include/posix/lstack_epoll.h | 24 ++
9 files changed, 350 insertions(+), 139 deletions(-)
diff --git a/src/lstack/api/lstack_epoll.c b/src/lstack/api/lstack_epoll.c
index b8d53f6..cba67ea 100644
--- a/src/lstack/api/lstack_epoll.c
+++ b/src/lstack/api/lstack_epoll.c
@@ -15,6 +15,7 @@
#include <sys/epoll.h>
#include <time.h>
#include <poll.h>
+#include <stdatomic.h>
#include <lwip/lwipsock.h>
#include <lwip/sockets.h>
@@ -32,10 +33,14 @@
#include "gazelle_base_func.h"
#include "lstack_lwip.h"
#include "lstack_protocol_stack.h"
+#include "posix/lstack_epoll.h"
#define EPOLL_KERNEL_INTERVAL 10 /* ms */
-#define EPOLL_NSEC_TO_SEC 1000000000
+#define SEC_TO_NSEC 1000000000
+#define SEC_TO_MSEC 1000
+#define MSEC_TO_NSEC 1000000
#define EPOLL_MAX_EVENTS 512
+#define POLL_KERNEL_EVENTS 32
static PER_THREAD struct wakeup_poll g_wakeup_poll = {0};
static bool g_use_epoll = false; /* FIXME: when no epoll close prepare event for performance testing */
@@ -149,12 +154,12 @@ int32_t lstack_epoll_create(int32_t size)
posix_api->close_fn(fd);
GAZELLE_RETURN(EINVAL);
}
-
memset_s(wakeup, sizeof(struct wakeup_poll), 0, sizeof(struct wakeup_poll));
- sem_init(&wakeup->event_sem, 0, 0);
- sock->wakeup = wakeup;
init_list_node(&wakeup->event_list);
+ wakeup->epollfd = fd;
+ sem_init(&wakeup->event_sem, 0, 0);
+ sock->wakeup = wakeup;
g_use_epoll = true;
return fd;
@@ -162,6 +167,8 @@ int32_t lstack_epoll_create(int32_t size)
int32_t lstack_epoll_close(int32_t fd)
{
+ posix_api->close_fn(fd);
+
struct lwip_sock *sock = get_socket_by_fd(fd);
if (sock == NULL) {
LSTACK_LOG(ERR, LSTACK, "fd=%d sock is NULL errno=%d\n", fd, errno);
@@ -176,6 +183,43 @@ int32_t lstack_epoll_close(int32_t fd)
return 0;
}
+static uint16_t find_max_cnt_stack(int32_t *stack_count, uint16_t stack_num, struct protocol_stack *last_stack)
+{
+ uint16_t max_index = 0;
+ bool all_same_cnt = true;
+
+ for (uint16_t i = 1; i < stack_num; i++) {
+ if (stack_count[i] != stack_count[0]) {
+ all_same_cnt = false;
+ }
+
+ if (stack_count[i] > stack_count[max_index]) {
+ max_index = i;
+ }
+ }
+
+ /* all stack same, don't change */
+ if (all_same_cnt && last_stack) {
+ return last_stack->queue_id;
+ }
+
+ /* first bind and all stack same. choice tick as queue_id, avoid all bind to statck_0.*/
+ static uint16_t tick = 0;
+ if (all_same_cnt && stack_num) {
+ max_index = atomic_fetch_add(&tick, 1) % stack_num;
+ }
+
+ return max_index;
+}
+
+static void update_epoll_max_stack(struct wakeup_poll *wakeup)
+{
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ uint16_t bind_id = find_max_cnt_stack(wakeup->stack_fd_cnt, stack_group->stack_num, wakeup->max_stack);
+
+ wakeup->max_stack = stack_group->stacks[bind_id];
+}
+
int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_event *event)
{
LSTACK_LOG(DEBUG, LSTACK, "op=%d events: fd: %d\n", op, fd);
@@ -185,35 +229,38 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
GAZELLE_RETURN(EINVAL);
}
+ struct lwip_sock *epoll_sock = get_socket_by_fd(epfd);
+ if (epoll_sock == NULL || epoll_sock->wakeup == NULL) {
+ return posix_api->epoll_ctl_fn(epfd, op, fd, event);
+ }
+
struct lwip_sock *sock = get_socket(fd);
if (sock == NULL) {
+ epoll_sock->wakeup->have_kernel_fd = true;
return posix_api->epoll_ctl_fn(epfd, op, fd, event);
}
if (CONN_TYPE_HAS_HOST(sock->conn)) {
+ epoll_sock->wakeup->have_kernel_fd = true;
int32_t ret = posix_api->epoll_ctl_fn(epfd, op, fd, event);
if (ret < 0) {
return ret;
}
}
- struct lwip_sock *epoll_sock = get_socket_by_fd(epfd);
- if (epoll_sock == NULL || epoll_sock->wakeup == NULL) {
- LSTACK_LOG(ERR, LSTACK, "epfd=%d\n", fd);
- GAZELLE_RETURN(EINVAL);
- }
-
- uint32_t events = event->events | EPOLLERR | EPOLLHUP;
do {
switch (op) {
case EPOLL_CTL_ADD:
sock->wakeup = epoll_sock->wakeup;
+ if (sock->stack) {
+ epoll_sock->wakeup->stack_fd_cnt[sock->stack->queue_id]++;
+ }
if (list_is_empty(&sock->event_list)) {
list_add_node(&sock->wakeup->event_list, &sock->event_list);
}
/* fall through */
case EPOLL_CTL_MOD:
- sock->epoll_events = events;
+ sock->epoll_events = event->events | EPOLLERR | EPOLLHUP;
sock->ep_data = event->data;
if (sock->conn && NETCONNTYPE_GROUP(netconn_type(sock->conn)) == NETCONN_TCP) {
raise_pending_events(sock);
@@ -222,6 +269,9 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
case EPOLL_CTL_DEL:
list_del_node_init(&sock->event_list);
sock->epoll_events = 0;
+ if (sock->stack) {
+ epoll_sock->wakeup->stack_fd_cnt[sock->stack->queue_id]--;
+ }
break;
default:
GAZELLE_RETURN(EINVAL);
@@ -230,6 +280,7 @@ int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_even
sock = get_socket(fd);
} while (fd > 0 && sock != NULL);
+ update_epoll_max_stack(epoll_sock->wakeup);
return 0;
}
@@ -346,129 +397,196 @@ static int32_t poll_lwip_event(struct pollfd *fds, nfds_t nfds)
return event_num;
}
-static inline bool have_kernel_fd(int32_t epfd, struct pollfd *fds, nfds_t nfds)
+static void ms_to_timespec(struct timespec *timespec, int32_t timeout)
{
- /* when epfd > 0 is epoll type */
- for (uint32_t i = 0; i < nfds && epfd < 0; i++) {
- if (get_socket(fds[i].fd) == NULL) {
- return true;
+ clock_gettime(CLOCK_REALTIME, timespec);
+ timespec->tv_sec += timeout / SEC_TO_MSEC;
+ timespec->tv_nsec += (timeout % SEC_TO_MSEC) * MSEC_TO_NSEC;
+ timespec->tv_sec += timespec->tv_nsec / SEC_TO_NSEC;
+ timespec->tv_nsec = timespec->tv_nsec % SEC_TO_NSEC;
+}
+
+static void change_epollfd_kernel_thread(struct wakeup_poll *wakeup, struct protocol_stack *old_stack,
+ struct protocol_stack *new_stack)
+{
+ if (old_stack) {
+ if (posix_api->epoll_ctl_fn(old_stack->epollfd, EPOLL_CTL_DEL, wakeup->epollfd, NULL) != 0) {
+ LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno);
}
}
- return false;
+ /* avoid kernel thread post too much, use EPOLLET */
+ struct epoll_event event;
+ event.data.ptr = &wakeup->event_sem;
+ event.events = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLET;
+ if (posix_api->epoll_ctl_fn(new_stack->epollfd, EPOLL_CTL_ADD, wakeup->epollfd, &event) != 0) {
+ LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno);
+ }
}
-static inline int32_t poll_kernel_event(struct pollfd *fds, nfds_t nfds)
+static void epoll_bind_statck(struct wakeup_poll *wakeup)
{
- int32_t event_num = 0;
-
- for (uint32_t i = 0; i < nfds; i++) {
- /* lwip event */
- if (get_socket(fds[i].fd) != NULL || fds[i].fd < 0) {
- continue;
- }
-
- int32_t ret = posix_api->poll_fn(&fds[i], 1, 0);
- if (ret < 0) {
- if (errno != EINTR) {
- return ret;
- }
- } else {
- event_num += ret;
- }
+ /* all fd is kernel, set rand stack */
+ if (wakeup->bind_stack == NULL && wakeup->max_stack== NULL) {
+ update_epoll_max_stack(wakeup);
}
- return event_num;
+ if (wakeup->bind_stack != wakeup->max_stack && wakeup->max_stack) {
+ bind_to_stack_numa(wakeup->max_stack);
+ change_epollfd_kernel_thread(wakeup, wakeup->bind_stack, wakeup->max_stack);
+ wakeup->bind_stack = wakeup->max_stack;
+ }
}
-static int32_t get_event(struct wakeup_poll *wakeup, int32_t epfd, void *out, int32_t maxevents, int32_t timeout)
+int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout)
{
- struct pollfd *fds = (struct pollfd *)out;
- struct epoll_event *events = (struct epoll_event *)out;
- bool have_kernel = have_kernel_fd(epfd, fds, maxevents);
+ struct lwip_sock *sock = get_socket_by_fd(epfd);
+ if (sock == NULL || sock->wakeup == NULL) {
+ return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout);
+ }
+
int32_t event_num = 0;
- int32_t poll_time = 0;
int32_t ret;
- /* when epfd > 0 is epoll type */
+ struct timespec epoll_time;
+ if (timeout >= 0) {
+ ms_to_timespec(&epoll_time, timeout);
+ }
+
+ epoll_bind_statck(sock->wakeup);
+
do {
- event_num += (epfd > 0) ? epoll_lwip_event(wakeup, &events[event_num], maxevents - event_num) :
- poll_lwip_event(fds, maxevents);
-
- if (have_kernel) {
- int32_t event_kernel_num = (epfd > 0) ?
- posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0) :
- poll_kernel_event(fds, maxevents);
- if (event_kernel_num < 0) {
- return event_kernel_num;
- }
- event_num += event_kernel_num;
- if (timeout >= 0 && poll_time >= timeout) {
- break;
- }
- poll_time += EPOLL_KERNEL_INTERVAL;
+ event_num += epoll_lwip_event(sock->wakeup, &events[event_num], maxevents - event_num);
+
+ if (sock->wakeup->have_kernel_fd) {
+ event_num += posix_api->epoll_wait_fn(epfd, &events[event_num], maxevents - event_num, 0);
}
if (event_num > 0) {
break;
}
- int32_t interval = (have_kernel) ? EPOLL_KERNEL_INTERVAL : timeout;
- struct timespec epoll_interval;
- clock_gettime(CLOCK_REALTIME, &epoll_interval);
- epoll_interval.tv_sec += interval / 1000;
- epoll_interval.tv_nsec += (interval % 1000) * 1000000;
- epoll_interval.tv_sec += epoll_interval.tv_nsec / 1000000000;
- epoll_interval.tv_nsec = epoll_interval.tv_nsec % 1000000000;
-
- if (timeout < 0 && !have_kernel) {
- ret = sem_wait(&wakeup->event_sem);
+ if (timeout < 0) {
+ ret = sem_wait(&sock->wakeup->event_sem);
} else {
- ret = sem_timedwait(&wakeup->event_sem, &epoll_interval);
+ ret = sem_timedwait(&sock->wakeup->event_sem, &epoll_time);
}
-
- if (!have_kernel && ret < 0) {
- break;
- }
- } while (event_num <= maxevents);
+ } while (ret == 0);
return event_num;
}
-int32_t lstack_epoll_wait(int32_t epfd, struct epoll_event* events, int32_t maxevents, int32_t timeout)
+static void init_poll_wakeup_data(struct wakeup_poll *wakeup)
{
- /* avoid the starvation of epoll events from both netstack */
- maxevents = LWIP_MIN(LWIP_EPOOL_MAX_EVENTS, maxevents);
+ sem_init(&wakeup->event_sem, 0, 0);
- struct lwip_sock *sock = get_socket_by_fd(epfd);
- if (sock == NULL) {
- GAZELLE_RETURN(EINVAL);
+ wakeup->last_fds = calloc(POLL_KERNEL_EVENTS, sizeof(struct pollfd));
+ if (wakeup->last_fds == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
}
- if (sock->wakeup == NULL) {
- return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout);
+ wakeup->events = calloc(POLL_KERNEL_EVENTS, sizeof(struct epoll_event));
+ if (wakeup->events == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
}
- return get_event(sock->wakeup, epfd, events, maxevents, timeout);
+ wakeup->last_max_nfds = POLL_KERNEL_EVENTS;
+
+ wakeup->epollfd = posix_api->epoll_create_fn(POLL_KERNEL_EVENTS);
+ if (wakeup->epollfd < 0) {
+ LSTACK_LOG(ERR, LSTACK, "epoll_create_fn errno=%d\n", errno);
+ }
}
-static void poll_init(struct pollfd *fds, nfds_t nfds, struct wakeup_poll *wakeup)
+static void resize_kernel_poll(struct wakeup_poll *wakeup, nfds_t nfds)
{
- int32_t stack_count[PROTOCOL_STACK_MAX] = {0};
+ wakeup->last_fds = realloc(wakeup->last_fds, nfds * sizeof(struct pollfd));
+ if (wakeup->last_fds == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
+ }
+
+ wakeup->events = realloc(wakeup->events, nfds * sizeof(struct epoll_event));
+ if (wakeup->events == NULL) {
+ LSTACK_LOG(ERR, LSTACK, "calloc failed errno=%d\n", errno);
+ }
+
+ wakeup->last_max_nfds = nfds;
+ memset_s(wakeup->last_fds, nfds * sizeof(struct pollfd), 0, nfds * sizeof(struct pollfd));
+}
+
+static void poll_bind_statck(struct wakeup_poll *wakeup, int32_t *stack_count)
+{
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ uint16_t bind_id = find_max_cnt_stack(stack_count, stack_group->stack_num, wakeup->bind_stack);
+
+ if (wakeup->bind_stack && wakeup->bind_stack->queue_id == bind_id) {
+ return;
+ }
+ change_epollfd_kernel_thread(wakeup, wakeup->bind_stack, stack_group->stacks[bind_id]);
+ bind_to_stack_numa(stack_group->stacks[bind_id]);
+ wakeup->bind_stack = stack_group->stacks[bind_id];
+}
+
+static void update_kernel_poll(struct wakeup_poll *wakeup, uint32_t index, struct pollfd *new_fd)
+{
+ posix_api->epoll_ctl_fn(wakeup->epollfd, EPOLL_CTL_DEL, wakeup->last_fds[index].fd, NULL);
+
+ if (new_fd == NULL) {
+ return;
+ }
+
+ struct epoll_event event;
+ event.data.u32 = index;
+ event.events = new_fd->events;
+ if (posix_api->epoll_ctl_fn(wakeup->epollfd, EPOLL_CTL_ADD, new_fd->fd, &event) != 0) {
+ LSTACK_LOG(ERR, LSTACK, "epoll_ctl_fn errno=%d\n", errno);
+ }
+
+ wakeup->last_fds[index].fd = new_fd->fd;
+ wakeup->last_fds[index].events = new_fd->events;
+
+ wakeup->have_kernel_fd = true;
+}
+
+static void poll_init(struct wakeup_poll *wakeup, struct pollfd *fds, nfds_t nfds)
+{
if (!wakeup->init) {
wakeup->init = true;
- sem_init(&wakeup->event_sem, 0, 0);
+ init_poll_wakeup_data(wakeup);
} else {
while (sem_trywait(&wakeup->event_sem) == 0) {}
}
+ if (nfds > wakeup->last_max_nfds) {
+ resize_kernel_poll(wakeup, nfds);
+ }
+
+ int32_t stack_count[PROTOCOL_STACK_MAX] = {0};
+ int32_t poll_change = 0;
+
+ /* poll fds num less, del old fd */
+ for (uint32_t i = nfds; i < wakeup->last_nfds; i++) {
+ update_kernel_poll(wakeup, i, NULL);
+ poll_change = 1;
+ }
+
for (uint32_t i = 0; i < nfds; i++) {
- int32_t fd = fds[i].fd;
fds[i].revents = 0;
+ if (fds[i].fd == wakeup->last_fds[i].fd && fds[i].events == wakeup->last_fds[i].events) {
+ continue;
+ }
+ poll_change = 1;
+
+ int32_t fd = fds[i].fd;
+ struct lwip_sock *sock = get_socket(fd);
+ if (sock == NULL || CONN_TYPE_HAS_HOST(sock->conn)) {
+ update_kernel_poll(wakeup, i, fds + i);
+ }
+
do {
- struct lwip_sock *sock = get_socket(fd);
+ sock = get_socket(fd);
if (sock == NULL || sock->conn == NULL) {
break;
}
@@ -481,25 +599,50 @@ static void poll_init(struct pollfd *fds, nfds_t nfds, struct wakeup_poll *wakeu
} while (fd > 0);
}
- if (wakeup->bind_stack) {
+ wakeup->last_nfds = nfds;
+ if (poll_change == 0) {
return;
}
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- uint32_t bind_id = 0;
- for (uint32_t i = 0; i < stack_group->stack_num; i++) {
- if (stack_count[i] > stack_count[bind_id]) {
- bind_id = i;
- }
- }
-
- bind_to_stack_numa(stack_group->stacks[bind_id]);
- wakeup->bind_stack = stack_group->stacks[bind_id];
+ poll_bind_statck(wakeup, stack_count);
}
int32_t lstack_poll(struct pollfd *fds, nfds_t nfds, int32_t timeout)
{
- poll_init(fds, nfds, &g_wakeup_poll);
+ poll_init(&g_wakeup_poll, fds, nfds);
- return get_event(&g_wakeup_poll, -1, fds, nfds, timeout);
+ int32_t event_num = 0;
+ int32_t ret;
+
+ struct timespec poll_time;
+ if (timeout >= 0) {
+ ms_to_timespec(&poll_time, timeout);
+ }
+
+ /* when epfd > 0 is epoll type */
+ do {
+ event_num += poll_lwip_event(fds, nfds);
+
+ /* reduce syscall epoll_wait */
+ if (g_wakeup_poll.have_kernel_fd) {
+ int32_t kernel_num = posix_api->epoll_wait_fn(g_wakeup_poll.epollfd, g_wakeup_poll.events, nfds, 0);
+ for (int32_t i = 0; i < kernel_num; i++) {
+ uint32_t index = g_wakeup_poll.events[i].data.u32;
+ fds[index].revents = g_wakeup_poll.events[i].events;
+ }
+ event_num += kernel_num >= 0 ? kernel_num : 0;
+ }
+
+ if (event_num > 0) {
+ break;
+ }
+
+ if (timeout < 0) {
+ ret = sem_wait(&g_wakeup_poll.event_sem);
+ } else {
+ ret = sem_timedwait(&g_wakeup_poll.event_sem, &poll_time);
+ }
+ } while (ret == 0);
+
+ return event_num;
}
diff --git a/src/lstack/api/lstack_wrap.c b/src/lstack/api/lstack_wrap.c
index f623da3..bf5dcb4 100644
--- a/src/lstack/api/lstack_wrap.c
+++ b/src/lstack/api/lstack_wrap.c
@@ -45,8 +45,7 @@ enum KERNEL_LWIP_PATH {
static inline enum KERNEL_LWIP_PATH select_path(int fd)
{
if (posix_api == NULL) {
- /* link liblstack.so using LD_PRELOAD mode will read liblstack.so,
- poisx_api need to be initialized here */
+ /* posix api maybe call before gazelle init */
if (posix_api_init() != 0) {
LSTACK_PRE_LOG(LSTACK_ERR, "posix_api_init failed\n");
}
@@ -78,8 +77,7 @@ static inline enum KERNEL_LWIP_PATH select_path(int fd)
static inline int32_t do_epoll_create(int32_t size)
{
if (posix_api == NULL) {
- /* link liblstack.so using LD_PRELOAD mode will read liblstack.so,
- poisx_api need to be initialized here */
+ /* posix api maybe call before gazelle init */
if (posix_api_init() != 0) {
LSTACK_PRE_LOG(LSTACK_ERR, "posix_api_init failed\n");
}
@@ -99,11 +97,6 @@ static inline int32_t do_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct
return posix_api->epoll_ctl_fn(epfd, op, fd, event);
}
- struct lwip_sock *sock = get_socket_by_fd(epfd);
- if (sock == NULL || sock->wakeup == NULL) {
- return posix_api->epoll_ctl_fn(epfd, op, fd, event);
- }
-
return lstack_epoll_ctl(epfd, op, fd, event);
}
@@ -113,11 +106,6 @@ static inline int32_t do_epoll_wait(int32_t epfd, struct epoll_event* events, in
return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout);
}
- struct lwip_sock *sock = get_socket_by_fd(epfd);
- if (sock == NULL || sock->wakeup == NULL) {
- return posix_api->epoll_wait_fn(epfd, events, maxevents, timeout);
- }
-
if (epfd < 0) {
GAZELLE_RETURN(EBADF);
}
@@ -362,6 +350,11 @@ static inline ssize_t do_sendmsg(int32_t s, const struct msghdr *message, int32_
static inline int32_t do_close(int32_t s)
{
+ struct lwip_sock *sock = get_socket_by_fd(s);
+ if (sock && sock->wakeup && sock->wakeup->epollfd == s) {
+ return lstack_epoll_close(s);
+ }
+
if (select_path(s) == PATH_KERNEL) {
return posix_api->close_fn(s);
}
diff --git a/src/lstack/core/lstack_dpdk.c b/src/lstack/core/lstack_dpdk.c
index aa91201..cdd2c05 100644
--- a/src/lstack/core/lstack_dpdk.c
+++ b/src/lstack/core/lstack_dpdk.c
@@ -95,8 +95,8 @@ int32_t dpdk_eal_init(void)
if (ret < 0) {
if (rte_errno == EALREADY) {
LSTACK_PRE_LOG(LSTACK_INFO, "rte_eal_init aleady init\n");
- /* maybe other program inited, merge init param share init */
- ret = 0;
+ /* maybe other program inited, merge init param share init */
+ ret = 0;
}
else {
LSTACK_PRE_LOG(LSTACK_ERR, "rte_eal_init failed init, rte_errno %d\n", rte_errno);
diff --git a/src/lstack/core/lstack_init.c b/src/lstack/core/lstack_init.c
index 037b8fd..f8e96bf 100644
--- a/src/lstack/core/lstack_init.c
+++ b/src/lstack/core/lstack_init.c
@@ -270,7 +270,7 @@ __attribute__((constructor)) void gazelle_network_init(void)
lwip_sock_init();
/* wait stack thread and kernel_event thread init finish */
- wait_sem_value(&get_protocol_stack_group()->all_init, get_protocol_stack_group()->stack_num);
+ wait_sem_value(&get_protocol_stack_group()->all_init, get_protocol_stack_group()->stack_num * 2);
if (g_init_fail) {
LSTACK_EXIT(1, "stack thread or kernel_event thread failed\n");
}
diff --git a/src/lstack/core/lstack_lwip.c b/src/lstack/core/lstack_lwip.c
index 00a82fb..8544ef7 100644
--- a/src/lstack/core/lstack_lwip.c
+++ b/src/lstack/core/lstack_lwip.c
@@ -29,6 +29,7 @@
#include "lstack_log.h"
#include "lstack_dpdk.h"
#include "lstack_stack_stat.h"
+#include "posix/lstack_epoll.h"
#include "lstack_lwip.h"
#define HALF_DIVISOR (2)
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 565d19b..eb975c0 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -35,6 +35,7 @@
#define READ_LIST_MAX 32
#define SEND_LIST_MAX 32
#define HANDLE_RPC_MSG_MAX 32
+#define KERNEL_EPOLL_MAX 256
static PER_THREAD uint16_t g_stack_idx = PROTOCOL_STACK_MAX;
static struct protocol_stack_group g_stack_group = {0};
@@ -43,9 +44,6 @@ static PER_THREAD long g_stack_tid = 0;
void set_init_fail(void);
typedef void *(*stack_thread_func)(void *arg);
-#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
-void update_stack_events(struct protocol_stack *stack);
-#endif
int32_t bind_to_stack_numa(struct protocol_stack *stack)
{
@@ -206,6 +204,10 @@ static void* gazelle_weakup_thread(void *arg)
LSTACK_LOG(INFO, LSTACK, "weakup_%02d start\n", stack->queue_id);
for (;;) {
+ if (rte_ring_count(stack->wakeup_ring) == 0) {
+ continue;
+ }
+
sem_t *event_sem;
if (rte_ring_sc_dequeue(stack->wakeup_ring, (void **)&event_sem)) {
continue;
@@ -268,6 +270,61 @@ static int32_t init_stack_value(struct protocol_stack *stack, uint16_t queue_id)
return 0;
}
+static void* gazelle_kernel_event(void *arg)
+{
+ uint16_t queue_id = *(uint16_t *)arg;
+
+ int32_t epoll_fd = posix_api->epoll_create_fn(GAZELLE_LSTACK_MAX_CONN);
+ if (epoll_fd < 0) {
+ LSTACK_LOG(ERR, LSTACK, "queue_id=%d epoll_fd=%d errno=%d\n", queue_id, epoll_fd, errno);
+ /* exit in main thread, avoid create mempool and exit at the same time */
+ set_init_fail();
+ sem_post(&get_protocol_stack_group()->all_init);
+ return NULL;
+ }
+
+ struct protocol_stack *stack = get_protocol_stack_group()->stacks[queue_id];
+ stack->epollfd = epoll_fd;
+
+ sem_post(&get_protocol_stack_group()->all_init);
+ LSTACK_LOG(INFO, LSTACK, "kernel_event_%02d start\n", stack->queue_id);
+
+ struct epoll_event events[KERNEL_EPOLL_MAX];
+ for (;;) {
+ int32_t event_num = posix_api->epoll_wait_fn(epoll_fd, events, KERNEL_EPOLL_MAX, -1);
+ if (event_num <= 0) {
+ continue;
+ }
+
+ for (int32_t i = 0; i < event_num; i++) {
+ if (events[i].data.ptr) {
+ sem_post((sem_t *)events[i].data.ptr);
+ }
+ }
+ }
+
+ return NULL;
+}
+
+static int32_t create_companion_thread(struct protocol_stack_group *stack_group, struct protocol_stack *stack)
+{
+ int32_t ret;
+
+ if (stack_group->wakeup_enable) {
+ ret = create_thread(stack->queue_id, "gazelleweakup", gazelle_weakup_thread);
+ if (ret != 0) {
+ LSTACK_LOG(ERR, LSTACK, "gazelleweakup ret=%d errno=%d\n", ret, errno);
+ return ret;
+ }
+ }
+
+ ret = create_thread(stack->queue_id, "gazellekernel", gazelle_kernel_event);
+ if (ret != 0) {
+ LSTACK_LOG(ERR, LSTACK, "gazellekernelEvent ret=%d errno=%d\n", ret, errno);
+ }
+ return ret;
+}
+
void wait_sem_value(sem_t *sem, int32_t wait_value)
{
int32_t sem_val;
@@ -315,12 +372,9 @@ static struct protocol_stack * stack_thread_init(uint16_t queue_id)
return NULL;
}
- if (stack_group->wakeup_enable) {
- int32_t ret = create_thread(stack->queue_id, "gazelleweakup", gazelle_weakup_thread);
- if (ret != 0) {
- free(stack);
- return NULL;
- }
+ if (create_companion_thread(stack_group, stack) != 0) {
+ free(stack);
+ return NULL;
}
return stack;
@@ -338,6 +392,7 @@ static void* gazelle_stack_thread(void *arg)
LSTACK_LOG(ERR, LSTACK, "stack_thread_init failed queue_id=%d\n", queue_id);
return NULL;
}
+
sem_post(&get_protocol_stack_group()->all_init);
LSTACK_LOG(INFO, LSTACK, "stack_%02d init success\n", queue_id);
@@ -351,10 +406,6 @@ static void* gazelle_stack_thread(void *arg)
send_stack_list(stack, SEND_LIST_MAX);
sys_timer_run();
-
-#ifdef GAZELLE_USE_EPOLL_EVENT_STACK
- update_stack_events(stack);
-#endif
}
return NULL;
@@ -378,7 +429,13 @@ static int32_t init_protocol_sem(void)
LSTACK_LOG(ERR, PORT, "sem_init failed ret=%d errno=%d\n", ret, errno);
return -1;
}
-
+
+ ret = sem_init(&stack_group->all_init, 0, 0);
+ if (ret < 0) {
+ LSTACK_LOG(ERR, PORT, "sem_init failed ret=%d errno=%d\n", ret, errno);
+ return -1;
+ }
+
return 0;
}
diff --git a/src/lstack/include/lstack_cfg.h b/src/lstack/include/lstack_cfg.h
index 345a373..987828d 100644
--- a/src/lstack/include/lstack_cfg.h
+++ b/src/lstack/include/lstack_cfg.h
@@ -33,7 +33,6 @@
#define LOG_DIR_PATH PATH_MAX
#define LOG_LEVEL_LEN 16
#define GAZELLE_MAX_NUMA_NODES 8
-#define LWIP_EPOOL_MAX_EVENTS 512
/* Default value of low power mode parameters */
#define LSTACK_LPM_DETECT_MS_MIN (5 * 1000)
diff --git a/src/lstack/include/lstack_protocol_stack.h b/src/lstack/include/lstack_protocol_stack.h
index 9852878..bc4e4bd 100644
--- a/src/lstack/include/lstack_protocol_stack.h
+++ b/src/lstack/include/lstack_protocol_stack.h
@@ -51,6 +51,7 @@ struct protocol_stack {
struct list_node send_list;
struct list_node event_list;
pthread_spinlock_t event_lock;
+ int32_t epollfd; /* kernel event thread epoll fd */
struct gazelle_stat_pkts stats;
struct gazelle_stack_latency latency;
@@ -75,13 +76,6 @@ struct protocol_stack_group {
uint64_t call_alloc_fail;
};
-struct wakeup_poll {
- bool init;
- struct protocol_stack *bind_stack;
- struct list_node event_list; /* epoll temp use poll */
- sem_t event_sem;
-};
-
long get_stack_tid(void);
struct protocol_stack *get_protocol_stack(void);
struct protocol_stack *get_protocol_stack_by_fd(int32_t fd);
diff --git a/src/lstack/include/posix/lstack_epoll.h b/src/lstack/include/posix/lstack_epoll.h
index cac640b..a83f41f 100644
--- a/src/lstack/include/posix/lstack_epoll.h
+++ b/src/lstack/include/posix/lstack_epoll.h
@@ -18,6 +18,30 @@ extern "C" {
#endif
#include <poll.h>
+#include <stdbool.h>
+#include <semaphore.h>
+
+#include "lstack_protocol_stack.h"
+
+struct wakeup_poll {
+ bool init;
+ struct protocol_stack *bind_stack;
+ sem_t event_sem;
+
+ int32_t epollfd;
+ bool have_kernel_fd;
+
+ /* poll */
+ struct pollfd *last_fds;
+ nfds_t last_nfds;
+ nfds_t last_max_nfds;
+ struct epoll_event *events;
+
+ /* epoll */
+ int32_t stack_fd_cnt[PROTOCOL_STACK_MAX];
+ struct protocol_stack *max_stack;
+ struct list_node event_list; /* epoll temp use */
+};
int32_t lstack_epoll_create(int32_t size);
int32_t lstack_epoll_ctl(int32_t epfd, int32_t op, int32_t fd, struct epoll_event *event);
--
2.23.0
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/wu-changsheng/gazelle-tar.git
git@gitee.com:wu-changsheng/gazelle-tar.git
wu-changsheng
gazelle-tar
gazelle-tar
master

搜索帮助